1#![forbid(unsafe_code)]
8
9pub use jmap_types::{
10 Argument, Id, Invocation, JmapError, JmapRequest, JmapResponse, ResultReference, State, UTCDate,
11};
12
13pub mod backend;
14pub mod handlers;
15mod helpers;
16
17pub use backend::{
18 AddedItem, BackendChangesError, BackendSetError, ChangesResult, GetObject, JmapBackend,
19 JmapObject, QueryChangesResult, QueryObject, QueryResult, SetError, SetErrorType, SetObject,
20};
21pub use handlers::{handle_changes, handle_get, handle_query, handle_query_changes};
22pub use helpers::{
23 extract_account_id, json_merge_patch, not_found_json, now_utc_string, ser,
24 MAX_MERGE_PATCH_DEPTH,
25};
26
27mod parse;
28mod response;
29
30pub use parse::{check_known_capabilities, parse_request, resolve_args};
31pub use response::{error_invocation, error_status, request_error, RequestError};
32
33use std::{collections::HashMap, fmt, future::Future, pin::Pin, sync::Arc};
34
35use serde_json::Value;
36use tokio::task;
37
38pub type HandlerFuture =
48 Pin<Box<dyn Future<Output = Result<(Value, Vec<Invocation>), JmapError>> + Send>>;
49
50pub trait JmapHandler<CallerCtx>: Send + Sync {
63 fn call(
76 &self,
77 method: String,
78 call_id: String,
79 args: Value,
80 caller: CallerCtx,
81 ) -> HandlerFuture;
82}
83
84pub struct Dispatcher<CallerCtx> {
100 handlers: HashMap<String, Arc<dyn JmapHandler<CallerCtx>>>,
101}
102
103impl<CallerCtx: Clone + Send + 'static> Dispatcher<CallerCtx> {
104 pub fn new() -> Self {
106 Self {
107 handlers: HashMap::new(),
108 }
109 }
110
111 pub fn register(
118 &mut self,
119 method: impl Into<String>,
120 handler: Arc<dyn JmapHandler<CallerCtx>>,
121 ) {
122 self.handlers.insert(method.into(), handler);
123 }
124
125 pub async fn dispatch(
143 &self,
144 request: JmapRequest,
145 caller: CallerCtx,
146 session_state: State,
147 ) -> JmapResponse {
148 let mut method_responses: Vec<Invocation> = Vec::with_capacity(request.method_calls.len());
149 let client_sent_created_ids = request.created_ids.is_some();
150 let mut created_ids: HashMap<Id, Id> = request.created_ids.unwrap_or_default();
151
152 for (method, mut args, call_id) in request.method_calls {
154 if let Err(e) = resolve_args(&mut args, &method_responses) {
156 method_responses.push(error_invocation(&call_id, e));
157 continue;
158 }
159
160 let handler = match self.handlers.get(&method) {
162 Some(h) => Arc::clone(h),
163 None => {
164 method_responses.push(error_invocation(&call_id, JmapError::unknown_method()));
165 continue;
166 }
167 };
168
169 let caller_clone = caller.clone();
170 let method_clone = method.clone();
171 let call_id_clone = call_id.clone();
172
173 let result: Result<
175 Result<(Value, Vec<Invocation>), JmapError>,
176 tokio::task::JoinError,
177 > = task::spawn(async move {
178 handler
179 .call(method_clone, call_id_clone, args, caller_clone)
180 .await
181 })
182 .await;
183
184 match result {
185 Ok(Ok((primary_value, extra_invocations))) => {
186 if client_sent_created_ids {
190 if let Some(map) = primary_value.get("created").and_then(|v| v.as_object())
191 {
192 for (client_id, created_obj) in map {
193 if let Some(id_val) = created_obj.get("id").and_then(|v| v.as_str())
199 {
200 created_ids
201 .insert(Id::from(client_id.as_str()), Id::from(id_val));
202 }
203 }
204 }
205 }
206 method_responses.push((method, primary_value, call_id));
210 method_responses.extend(extra_invocations);
211 }
212 Ok(Err(e)) => {
213 method_responses.push(error_invocation(&call_id, e));
214 }
215 Err(join_err) => {
216 let desc = if join_err.is_cancelled() {
219 "task cancelled"
220 } else {
221 "internal error"
222 };
223 method_responses.push(error_invocation(&call_id, JmapError::server_fail(desc)));
224 }
225 }
226 }
227
228 let created_ids = client_sent_created_ids.then_some(created_ids);
229
230 JmapResponse::new(method_responses, session_state, created_ids)
231 }
232}
233
234impl<CallerCtx: Clone + Send + 'static> Default for Dispatcher<CallerCtx> {
235 fn default() -> Self {
236 Self::new()
237 }
238}
239
240impl<CallerCtx> fmt::Debug for Dispatcher<CallerCtx> {
241 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
242 f.debug_struct("Dispatcher")
243 .field("methods", &self.handlers.keys())
244 .finish()
245 }
246}
247
248pub type BackendCallFn<B, C> =
263 dyn Fn(Arc<B>, String, serde_json::Value, C) -> HandlerFuture + Send + Sync + 'static;
264
265pub struct ClosureHandler<B: Send + Sync + 'static, C: Clone + Send + 'static> {
297 pub backend: Arc<B>,
300 pub call_fn: Box<BackendCallFn<B, C>>,
303}
304
305impl<B: Send + Sync + 'static, C: Clone + Send + 'static> JmapHandler<C> for ClosureHandler<B, C> {
306 fn call(
307 &self,
308 _method: String,
309 call_id: String,
310 args: serde_json::Value,
311 caller: C,
312 ) -> HandlerFuture {
313 (self.call_fn)(Arc::clone(&self.backend), call_id, args, caller)
314 }
315}
316
317#[cfg(test)]
318mod tests {
319 use super::*;
320 use serde_json::{json, Value};
321 use std::sync::{Arc, Mutex};
322
323 #[allow(dead_code)]
327 fn assert_dispatcher_send_sync() {
328 fn check<T: Send + Sync>() {}
329 check::<Dispatcher<String>>();
330 check::<Dispatcher<()>>();
331 }
332
333 struct EchoHandler(Value);
339
340 impl<C: Clone + Send + 'static> JmapHandler<C> for EchoHandler {
341 fn call(
342 &self,
343 _method: String,
344 _call_id: String,
345 _args: Value,
346 _caller: C,
347 ) -> HandlerFuture {
348 let v = self.0.clone();
349 Box::pin(async move { Ok((v, vec![])) })
350 }
351 }
352
353 struct ErrorHandler(JmapError);
355
356 impl JmapHandler<String> for ErrorHandler {
357 fn call(
358 &self,
359 _method: String,
360 _call_id: String,
361 _args: Value,
362 _caller: String,
363 ) -> HandlerFuture {
364 let e = self.0.clone();
365 Box::pin(async move { Err(e) })
366 }
367 }
368
369 struct CaptureArgsHandler(Arc<Mutex<Option<Value>>>);
371
372 impl JmapHandler<String> for CaptureArgsHandler {
373 fn call(
374 &self,
375 _method: String,
376 _call_id: String,
377 args: Value,
378 _caller: String,
379 ) -> HandlerFuture {
380 let slot = self.0.clone();
381 Box::pin(async move {
382 *slot.lock().expect("test: mutex poisoned") = Some(args);
383 Ok((json!({}), vec![]))
384 })
385 }
386 }
387
388 struct CaptureCallerHandler(Arc<Mutex<Option<String>>>);
390
391 impl JmapHandler<String> for CaptureCallerHandler {
392 fn call(
393 &self,
394 _method: String,
395 _call_id: String,
396 _args: Value,
397 caller: String,
398 ) -> HandlerFuture {
399 let slot = self.0.clone();
400 Box::pin(async move {
401 *slot.lock().expect("test: mutex poisoned") = Some(caller);
402 Ok((json!({}), vec![]))
403 })
404 }
405 }
406
407 struct PanicHandler;
409
410 impl JmapHandler<String> for PanicHandler {
411 fn call(
412 &self,
413 _method: String,
414 _call_id: String,
415 _args: Value,
416 _caller: String,
417 ) -> HandlerFuture {
418 Box::pin(async move { panic!("deliberate test panic") })
419 }
420 }
421
422 fn single_call(method: &str, args: Value, call_id: &str) -> JmapRequest {
427 JmapRequest::new(
428 vec!["urn:ietf:params:jmap:core".into()],
429 vec![(method.into(), args, call_id.into())],
430 None,
431 )
432 }
433
434 #[tokio::test]
440 async fn unknown_method_returns_error_invocation() {
441 let d: Dispatcher<String> = Dispatcher::new();
442 let req = single_call("Foo/get", json!({}), "c0");
443 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
444 assert_eq!(resp.method_responses.len(), 1);
445 let (_, args, call_id) = &resp.method_responses[0];
446 assert_eq!(call_id, "c0");
447 assert_eq!(args["type"], "unknownMethod");
448 }
449
450 #[tokio::test]
452 async fn known_method_success() {
453 let mut d: Dispatcher<String> = Dispatcher::new();
454 d.register("Foo/get", Arc::new(EchoHandler(json!({"list": []}))));
455 let req = single_call("Foo/get", json!({}), "c1");
456 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
457 assert_eq!(resp.method_responses.len(), 1);
458 let (method, args, call_id) = &resp.method_responses[0];
459 assert_eq!(method, "Foo/get");
460 assert_eq!(call_id, "c1");
461 assert_eq!(args["list"], json!([]));
462 }
463
464 #[tokio::test]
466 async fn handler_returns_error() {
467 let mut d: Dispatcher<String> = Dispatcher::new();
468 d.register("Foo/get", Arc::new(ErrorHandler(JmapError::not_found())));
469 let req = single_call("Foo/get", json!({}), "c2");
470 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
471 assert_eq!(resp.method_responses.len(), 1);
472 let (_, args, _) = &resp.method_responses[0];
473 assert_eq!(args["type"], "notFound");
474 }
475
476 #[tokio::test]
478 async fn session_state_echoed() {
479 let d: Dispatcher<String> = Dispatcher::new();
480 let req = JmapRequest::new(vec!["urn:ietf:params:jmap:core".into()], vec![], None);
481 let resp = d.dispatch(req, "alice".into(), "my-state-123".into()).await;
482 assert_eq!(resp.session_state.as_ref(), "my-state-123");
483 }
484
485 #[tokio::test]
492 async fn mixed_batch_all_responses_in_order() {
493 let mut d: Dispatcher<String> = Dispatcher::new();
494 d.register("M/a", Arc::new(EchoHandler(json!({"ok": true}))));
495 let req = JmapRequest::new(
497 vec!["urn:ietf:params:jmap:core".into()],
498 vec![
499 ("M/a".into(), json!({}), "c0".into()),
500 ("M/b".into(), json!({}), "c1".into()),
501 ("M/a".into(), json!({}), "c2".into()),
502 ],
503 None,
504 );
505 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
506 assert_eq!(
507 resp.method_responses.len(),
508 3,
509 "all three calls must produce a response"
510 );
511 assert_eq!(resp.method_responses[0].2, "c0");
513 assert!(
514 resp.method_responses[0].1.get("type").is_none(),
515 "c0 must not be an error"
516 );
517 assert_eq!(resp.method_responses[1].2, "c1");
519 assert_eq!(resp.method_responses[1].1["type"], "unknownMethod");
520 assert_eq!(resp.method_responses[2].2, "c2");
522 assert!(
523 resp.method_responses[2].1.get("type").is_none(),
524 "c2 must not be an error"
525 );
526 }
527
528 #[tokio::test]
530 async fn error_does_not_abort_subsequent_calls() {
531 let mut d: Dispatcher<String> = Dispatcher::new();
532 d.register("M/ok", Arc::new(EchoHandler(json!({"ok": true}))));
533 d.register("M/err", Arc::new(ErrorHandler(JmapError::forbidden())));
534 let req = JmapRequest::new(
535 vec!["urn:ietf:params:jmap:core".into()],
536 vec![
537 ("M/err".into(), json!({}), "c0".into()),
538 ("M/ok".into(), json!({}), "c1".into()),
539 ],
540 None,
541 );
542 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
543 assert_eq!(resp.method_responses.len(), 2);
544 assert_eq!(resp.method_responses[0].1["type"], "forbidden");
545 assert!(
546 resp.method_responses[1].1.get("type").is_none(),
547 "second call must succeed"
548 );
549 }
550
551 #[tokio::test]
557 async fn panicking_handler_returns_server_fail() {
558 let mut d: Dispatcher<String> = Dispatcher::new();
559 d.register("Panic/now", Arc::new(PanicHandler));
560 let req = single_call("Panic/now", json!({}), "c0");
561 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
562 assert_eq!(resp.method_responses.len(), 1);
563 let (_, args, _) = &resp.method_responses[0];
564 assert_eq!(
565 args["type"], "serverFail",
566 "panicking handler must produce serverFail"
567 );
568 }
569
570 #[tokio::test]
572 async fn panic_message_not_in_response() {
573 let mut d: Dispatcher<String> = Dispatcher::new();
574 d.register("Panic/now", Arc::new(PanicHandler));
575 let req = single_call("Panic/now", json!({}), "c0");
576 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
577 let (_, args, _) = &resp.method_responses[0];
578 if let Some(desc) = args["description"].as_str() {
579 assert!(
580 !desc.contains("deliberate test panic"),
581 "panic message must not leak into response description"
582 );
583 }
584 }
585
586 #[tokio::test]
592 async fn result_reference_resolved_before_dispatch() {
593 let captured = Arc::new(Mutex::new(None::<Value>));
594 let mut d: Dispatcher<String> = Dispatcher::new();
595 d.register(
596 "Foo/get",
597 Arc::new(EchoHandler(json!({"list": [{"id": "item-1"}]}))),
598 );
599 d.register(
600 "Bar/query",
601 Arc::new(CaptureArgsHandler(Arc::clone(&captured))),
602 );
603 let req = JmapRequest::new(
604 vec!["urn:ietf:params:jmap:core".into()],
605 vec![
606 ("Foo/get".into(), json!({}), "c0".into()),
607 (
608 "Bar/query".into(),
609 json!({"#ids": {"resultOf": "c0", "name": "Foo/get", "path": "/list/0/id"}}),
610 "c1".into(),
611 ),
612 ],
613 None,
614 );
615 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
616 assert_eq!(resp.method_responses.len(), 2);
617 assert!(
619 resp.method_responses[1].1.get("type").is_none(),
620 "Bar/query must succeed after ResultReference resolution"
621 );
622 let got = captured
624 .lock()
625 .unwrap()
626 .clone()
627 .expect("CaptureArgsHandler was not called");
628 assert_eq!(
629 got["ids"],
630 json!("item-1"),
631 "resolved value must be the string item-1"
632 );
633 assert!(
634 got.get("#ids").is_none(),
635 "#ids key must have been replaced"
636 );
637 }
638
639 #[tokio::test]
641 async fn result_reference_failure_stops_that_call() {
642 let d: Dispatcher<String> = Dispatcher::new();
643 let req = single_call(
644 "Foo/get",
645 json!({"#ids": {"resultOf": "nonexistent", "name": "Foo/get", "path": "/x"}}),
646 "c0",
647 );
648 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
649 assert_eq!(resp.method_responses.len(), 1);
650 let (_, args, _) = &resp.method_responses[0];
651 assert!(
652 args.get("type").is_some(),
653 "failed ResultReference must produce an error invocation"
654 );
655 }
656
657 #[tokio::test]
664 async fn created_ids_accumulated_from_set_response() {
665 let mut d: Dispatcher<String> = Dispatcher::new();
666 d.register(
667 "Foo/set",
668 Arc::new(EchoHandler(
669 json!({"created": {"client-1": {"id": "server-abc"}}}),
670 )),
671 );
672 let req = JmapRequest::new(
674 vec!["urn:ietf:params:jmap:core".into()],
675 vec![("Foo/set".into(), json!({}), "c0".into())],
676 Some(std::collections::HashMap::new()),
677 );
678 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
679 let ids = resp
680 .created_ids
681 .as_ref()
682 .expect("created_ids must be Some when client sent createdIds");
683 assert_eq!(
684 ids.get(&Id::from("client-1")),
685 Some(&Id::from("server-abc")),
686 "client-1 must map to server-abc"
687 );
688 }
689
690 #[tokio::test]
692 async fn created_ids_absent_when_no_set() {
693 let mut d: Dispatcher<String> = Dispatcher::new();
694 d.register("Foo/get", Arc::new(EchoHandler(json!({"list": []}))));
695 let req = single_call("Foo/get", json!({}), "c0");
696 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
697 assert!(
698 resp.created_ids.is_none(),
699 "created_ids must be None when no /set call created objects"
700 );
701 }
702
703 #[tokio::test]
705 async fn created_ids_accumulated_across_multiple_set_calls() {
706 let mut d: Dispatcher<String> = Dispatcher::new();
707 d.register(
708 "A/set",
709 Arc::new(EchoHandler(json!({"created": {"cA": {"id": "sA"}}}))),
710 );
711 d.register(
712 "B/set",
713 Arc::new(EchoHandler(json!({"created": {"cB": {"id": "sB"}}}))),
714 );
715 let req = JmapRequest::new(
717 vec!["urn:ietf:params:jmap:core".into()],
718 vec![
719 ("A/set".into(), json!({}), "c0".into()),
720 ("B/set".into(), json!({}), "c1".into()),
721 ],
722 Some(std::collections::HashMap::new()),
723 );
724 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
725 let ids = resp
726 .created_ids
727 .as_ref()
728 .expect("created_ids must be Some when client sent createdIds");
729 assert_eq!(
730 ids.get(&Id::from("cA")),
731 Some(&Id::from("sA")),
732 "cA must be present"
733 );
734 assert_eq!(
735 ids.get(&Id::from("cB")),
736 Some(&Id::from("sB")),
737 "cB must be present"
738 );
739 }
740
741 #[tokio::test]
744 async fn created_ids_merges_with_pre_populated_map() {
745 let mut d: Dispatcher<String> = Dispatcher::new();
746 d.register(
747 "Foo/set",
748 Arc::new(EchoHandler(
749 json!({"created": {"client-new": {"id": "server-new"}}}),
750 )),
751 );
752 let mut initial = std::collections::HashMap::new();
754 initial.insert(Id::from("client-old"), Id::from("server-old"));
755 let req = JmapRequest::new(
756 vec!["urn:ietf:params:jmap:core".into()],
757 vec![("Foo/set".into(), json!({}), "c0".into())],
758 Some(initial),
759 );
760 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
761 let ids = resp
762 .created_ids
763 .as_ref()
764 .expect("created_ids must be Some when client sent createdIds");
765 assert_eq!(
766 ids.get(&Id::from("client-old")),
767 Some(&Id::from("server-old")),
768 "pre-populated entry must be preserved"
769 );
770 assert_eq!(
771 ids.get(&Id::from("client-new")),
772 Some(&Id::from("server-new")),
773 "new /set entry must be merged in"
774 );
775 }
776
777 #[tokio::test]
783 async fn caller_ctx_passed_to_handler() {
784 let captured = Arc::new(Mutex::new(None::<String>));
785 let mut d: Dispatcher<String> = Dispatcher::new();
786 d.register(
787 "Foo/get",
788 Arc::new(CaptureCallerHandler(Arc::clone(&captured))),
789 );
790 let req = single_call("Foo/get", json!({}), "c0");
791 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
792 assert!(
793 resp.method_responses[0].1.get("type").is_none(),
794 "must succeed"
795 );
796 let got = captured
797 .lock()
798 .unwrap()
799 .clone()
800 .expect("handler was not called");
801 assert_eq!(got, "alice", "caller must be passed through unchanged");
802 }
803
804 #[tokio::test]
806 async fn unit_caller_ctx_works() {
807 let mut d: Dispatcher<()> = Dispatcher::new();
808 d.register("Foo/get", Arc::new(EchoHandler(json!({"ok": true}))));
809 let req = single_call("Foo/get", json!({}), "c0");
810 let resp = d.dispatch(req, (), "s0".into()).await;
811 assert_eq!(resp.method_responses.len(), 1);
812 assert!(
813 resp.method_responses[0].1.get("type").is_none(),
814 "must succeed with () caller"
815 );
816 }
817
818 struct ExtraInvocationHandler;
827
828 impl JmapHandler<String> for ExtraInvocationHandler {
829 fn call(
830 &self,
831 _method: String,
832 _call_id: String,
833 _args: Value,
834 _caller: String,
835 ) -> HandlerFuture {
836 Box::pin(async move {
837 let primary = json!({"type": "primary"});
838 let extra: Vec<Invocation> = vec![(
839 "Extra/call".to_owned(),
840 json!({"type": "extra"}),
841 "x0".to_owned(),
842 )];
843 Ok((primary, extra))
844 })
845 }
846 }
847
848 #[tokio::test]
851 async fn extra_invocations_appended_after_primary() {
852 let mut d: Dispatcher<String> = Dispatcher::new();
853 d.register("Sub/set", Arc::new(ExtraInvocationHandler));
854 let req = single_call("Sub/set", json!({}), "c0");
855 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
856
857 assert_eq!(
858 resp.method_responses.len(),
859 2,
860 "primary + 1 extra = 2 total invocations"
861 );
862 assert_eq!(resp.method_responses[0].0, "Sub/set");
864 assert_eq!(resp.method_responses[0].2, "c0");
865 assert_eq!(resp.method_responses[0].1["type"], "primary");
866 assert_eq!(resp.method_responses[1].0, "Extra/call");
868 assert_eq!(resp.method_responses[1].2, "x0");
869 assert_eq!(resp.method_responses[1].1["type"], "extra");
870 }
871
872 #[tokio::test]
875 async fn closure_handler_forwards_caller() {
876 #[derive(Clone)]
877 struct Ctx(String);
878
879 struct DummyBackend;
880
881 let received: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
883 let received_clone = Arc::clone(&received);
884
885 let handler: Arc<ClosureHandler<DummyBackend, Ctx>> = Arc::new(ClosureHandler {
886 backend: Arc::new(DummyBackend),
887 call_fn: Box::new(move |_b, _call_id, _args, ctx| {
888 let cap = Arc::clone(&received_clone);
889 Box::pin(async move {
890 *cap.lock().unwrap() = Some(ctx.0.clone());
891 Ok((serde_json::json!({}), vec![]))
892 })
893 }),
894 });
895
896 let ctx = Ctx("alice".to_owned());
897 handler
898 .call("Test/get".into(), "c1".into(), serde_json::json!({}), ctx)
899 .await
900 .expect("handler must succeed");
901
902 assert_eq!(
903 received.lock().unwrap().as_deref(),
904 Some("alice"),
905 "CallerCtx must be forwarded to the closure"
906 );
907 }
908
909 #[test]
912 fn closure_handler_is_jmap_handler() {
913 fn assert_handler<C: Clone + Send + 'static, H: JmapHandler<C>>(_: &H) {}
915
916 struct DummyBackend;
917 #[derive(Clone)]
918 struct Ctx;
919
920 let h = ClosureHandler {
921 backend: Arc::new(DummyBackend),
922 call_fn: Box::new(|_b, _ci, _a, _ctx| {
923 Box::pin(async { Ok((serde_json::json!({}), vec![])) })
924 }),
925 };
926 assert_handler::<Ctx, _>(&h);
927 }
928}