1#![forbid(unsafe_code)]
8
9pub use jmap_types::{
10 Argument, Id, Invocation, JmapError, JmapRequest, JmapResponse, ResultReference, State, UTCDate,
11};
12
13pub mod backend;
14pub mod handlers;
15pub(crate) mod helpers;
16
17pub use backend::{
18 AddedItem, BackendChangesError, BackendSetError, ChangesResult, GetObject, JmapBackend,
19 JmapObject, QueryChangesResult, QueryObject, QueryResult, SetError, SetErrorType, SetObject,
20};
21pub use handlers::{handle_changes, handle_get, handle_query, handle_query_changes};
22pub use helpers::{extract_account_id, not_found_json, now_utc_string, ser};
23
24mod parse;
25mod response;
26
27pub use parse::{parse_request, resolve_args};
28pub use response::{error_invocation, error_status, request_error, RequestError};
29
30use std::{collections::HashMap, fmt, future::Future, pin::Pin, sync::Arc};
31
32use serde_json::Value;
33use tokio::task;
34
35pub type HandlerFuture =
45 Pin<Box<dyn Future<Output = Result<(Value, Vec<Invocation>), JmapError>> + Send>>;
46
47pub trait JmapHandler<CallerCtx>: Send + Sync {
60 fn call(
73 &self,
74 method: String,
75 call_id: String,
76 args: Value,
77 caller: CallerCtx,
78 ) -> HandlerFuture;
79}
80
81pub struct Dispatcher<CallerCtx> {
97 handlers: HashMap<String, Arc<dyn JmapHandler<CallerCtx>>>,
98}
99
100impl<CallerCtx: Clone + Send + 'static> Dispatcher<CallerCtx> {
101 pub fn new() -> Self {
103 Self {
104 handlers: HashMap::new(),
105 }
106 }
107
108 pub fn register(
115 &mut self,
116 method: impl Into<String>,
117 handler: Arc<dyn JmapHandler<CallerCtx>>,
118 ) {
119 self.handlers.insert(method.into(), handler);
120 }
121
122 pub async fn dispatch(
140 &self,
141 request: JmapRequest,
142 caller: CallerCtx,
143 session_state: State,
144 ) -> JmapResponse {
145 let mut method_responses: Vec<Invocation> = Vec::with_capacity(request.method_calls.len());
146 let client_sent_created_ids = request.created_ids.is_some();
147 let mut created_ids: HashMap<Id, Id> = request.created_ids.unwrap_or_default();
148
149 for (method, mut args, call_id) in request.method_calls {
151 if let Err(e) = resolve_args(&mut args, &method_responses) {
153 method_responses.push(error_invocation(&call_id, e));
154 continue;
155 }
156
157 let handler = match self.handlers.get(&method) {
159 Some(h) => Arc::clone(h),
160 None => {
161 method_responses.push(error_invocation(&call_id, JmapError::unknown_method()));
162 continue;
163 }
164 };
165
166 let caller_clone = caller.clone();
167 let method_clone = method.clone();
168 let call_id_clone = call_id.clone();
169
170 let result: Result<
172 Result<(Value, Vec<Invocation>), JmapError>,
173 tokio::task::JoinError,
174 > = task::spawn(async move {
175 handler
176 .call(method_clone, call_id_clone, args, caller_clone)
177 .await
178 })
179 .await;
180
181 match result {
182 Ok(Ok((primary_value, extra_invocations))) => {
183 if client_sent_created_ids {
187 if let Some(map) = primary_value.get("created").and_then(|v| v.as_object())
188 {
189 for (client_id, created_obj) in map {
190 if let Some(id_val) = created_obj.get("id").and_then(|v| v.as_str())
196 {
197 created_ids.insert(client_id.as_str().into(), id_val.into());
198 }
199 }
200 }
201 }
202 method_responses.push((method, primary_value, call_id));
206 method_responses.extend(extra_invocations);
207 }
208 Ok(Err(e)) => {
209 method_responses.push(error_invocation(&call_id, e));
210 }
211 Err(join_err) => {
212 let desc = if join_err.is_cancelled() {
215 "task cancelled"
216 } else {
217 "internal error"
218 };
219 method_responses.push(error_invocation(&call_id, JmapError::server_fail(desc)));
220 }
221 }
222 }
223
224 let created_ids = client_sent_created_ids.then_some(created_ids);
225
226 JmapResponse::new(method_responses, session_state, created_ids)
227 }
228}
229
230impl<CallerCtx: Clone + Send + 'static> Default for Dispatcher<CallerCtx> {
231 fn default() -> Self {
232 Self::new()
233 }
234}
235
236impl<CallerCtx> fmt::Debug for Dispatcher<CallerCtx> {
237 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
238 f.debug_struct("Dispatcher")
239 .field("methods", &self.handlers.keys())
240 .finish()
241 }
242}
243
244pub type BackendCallFn<B> =
250 dyn Fn(Arc<B>, String, serde_json::Value) -> HandlerFuture + Send + Sync + 'static;
251
252pub struct ClosureHandler<B: Send + Sync + 'static> {
267 pub backend: Arc<B>,
268 pub call_fn: Box<BackendCallFn<B>>,
269}
270
271impl<B: Send + Sync + 'static, C: Clone + Send + 'static> JmapHandler<C> for ClosureHandler<B> {
272 fn call(
273 &self,
274 _method: String,
275 call_id: String,
276 args: serde_json::Value,
277 _caller: C,
278 ) -> HandlerFuture {
279 (self.call_fn)(Arc::clone(&self.backend), call_id, args)
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use serde_json::{json, Value};
287 use std::sync::{Arc, Mutex};
288
289 #[allow(dead_code)]
293 fn assert_dispatcher_send_sync() {
294 fn check<T: Send + Sync>() {}
295 check::<Dispatcher<String>>();
296 check::<Dispatcher<()>>();
297 }
298
299 struct EchoHandler(Value);
305
306 impl<C: Clone + Send + 'static> JmapHandler<C> for EchoHandler {
307 fn call(
308 &self,
309 _method: String,
310 _call_id: String,
311 _args: Value,
312 _caller: C,
313 ) -> HandlerFuture {
314 let v = self.0.clone();
315 Box::pin(async move { Ok((v, vec![])) })
316 }
317 }
318
319 struct ErrorHandler(JmapError);
321
322 impl JmapHandler<String> for ErrorHandler {
323 fn call(
324 &self,
325 _method: String,
326 _call_id: String,
327 _args: Value,
328 _caller: String,
329 ) -> HandlerFuture {
330 let e = self.0.clone();
331 Box::pin(async move { Err(e) })
332 }
333 }
334
335 struct CaptureArgsHandler(Arc<Mutex<Option<Value>>>);
337
338 impl JmapHandler<String> for CaptureArgsHandler {
339 fn call(
340 &self,
341 _method: String,
342 _call_id: String,
343 args: Value,
344 _caller: String,
345 ) -> HandlerFuture {
346 let slot = self.0.clone();
347 Box::pin(async move {
348 *slot.lock().expect("test: mutex poisoned") = Some(args);
349 Ok((json!({}), vec![]))
350 })
351 }
352 }
353
354 struct CaptureCallerHandler(Arc<Mutex<Option<String>>>);
356
357 impl JmapHandler<String> for CaptureCallerHandler {
358 fn call(
359 &self,
360 _method: String,
361 _call_id: String,
362 _args: Value,
363 caller: String,
364 ) -> HandlerFuture {
365 let slot = self.0.clone();
366 Box::pin(async move {
367 *slot.lock().expect("test: mutex poisoned") = Some(caller);
368 Ok((json!({}), vec![]))
369 })
370 }
371 }
372
373 struct PanicHandler;
375
376 impl JmapHandler<String> for PanicHandler {
377 fn call(
378 &self,
379 _method: String,
380 _call_id: String,
381 _args: Value,
382 _caller: String,
383 ) -> HandlerFuture {
384 Box::pin(async move { panic!("deliberate test panic") })
385 }
386 }
387
388 fn single_call(method: &str, args: Value, call_id: &str) -> JmapRequest {
393 JmapRequest::new(
394 vec!["urn:ietf:params:jmap:core".into()],
395 vec![(method.into(), args, call_id.into())],
396 None,
397 )
398 }
399
400 #[tokio::test]
406 async fn unknown_method_returns_error_invocation() {
407 let d: Dispatcher<String> = Dispatcher::new();
408 let req = single_call("Foo/get", json!({}), "c0");
409 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
410 assert_eq!(resp.method_responses.len(), 1);
411 let (_, args, call_id) = &resp.method_responses[0];
412 assert_eq!(call_id, "c0");
413 assert_eq!(args["type"], "unknownMethod");
414 }
415
416 #[tokio::test]
418 async fn known_method_success() {
419 let mut d: Dispatcher<String> = Dispatcher::new();
420 d.register("Foo/get", Arc::new(EchoHandler(json!({"list": []}))));
421 let req = single_call("Foo/get", json!({}), "c1");
422 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
423 assert_eq!(resp.method_responses.len(), 1);
424 let (method, args, call_id) = &resp.method_responses[0];
425 assert_eq!(method, "Foo/get");
426 assert_eq!(call_id, "c1");
427 assert_eq!(args["list"], json!([]));
428 }
429
430 #[tokio::test]
432 async fn handler_returns_error() {
433 let mut d: Dispatcher<String> = Dispatcher::new();
434 d.register("Foo/get", Arc::new(ErrorHandler(JmapError::not_found())));
435 let req = single_call("Foo/get", json!({}), "c2");
436 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
437 assert_eq!(resp.method_responses.len(), 1);
438 let (_, args, _) = &resp.method_responses[0];
439 assert_eq!(args["type"], "notFound");
440 }
441
442 #[tokio::test]
444 async fn session_state_echoed() {
445 let d: Dispatcher<String> = Dispatcher::new();
446 let req = JmapRequest::new(vec!["urn:ietf:params:jmap:core".into()], vec![], None);
447 let resp = d.dispatch(req, "alice".into(), "my-state-123".into()).await;
448 assert_eq!(resp.session_state.as_ref(), "my-state-123");
449 }
450
451 #[tokio::test]
458 async fn mixed_batch_all_responses_in_order() {
459 let mut d: Dispatcher<String> = Dispatcher::new();
460 d.register("M/a", Arc::new(EchoHandler(json!({"ok": true}))));
461 let req = JmapRequest::new(
463 vec!["urn:ietf:params:jmap:core".into()],
464 vec![
465 ("M/a".into(), json!({}), "c0".into()),
466 ("M/b".into(), json!({}), "c1".into()),
467 ("M/a".into(), json!({}), "c2".into()),
468 ],
469 None,
470 );
471 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
472 assert_eq!(
473 resp.method_responses.len(),
474 3,
475 "all three calls must produce a response"
476 );
477 assert_eq!(resp.method_responses[0].2, "c0");
479 assert!(
480 resp.method_responses[0].1.get("type").is_none(),
481 "c0 must not be an error"
482 );
483 assert_eq!(resp.method_responses[1].2, "c1");
485 assert_eq!(resp.method_responses[1].1["type"], "unknownMethod");
486 assert_eq!(resp.method_responses[2].2, "c2");
488 assert!(
489 resp.method_responses[2].1.get("type").is_none(),
490 "c2 must not be an error"
491 );
492 }
493
494 #[tokio::test]
496 async fn error_does_not_abort_subsequent_calls() {
497 let mut d: Dispatcher<String> = Dispatcher::new();
498 d.register("M/ok", Arc::new(EchoHandler(json!({"ok": true}))));
499 d.register("M/err", Arc::new(ErrorHandler(JmapError::forbidden())));
500 let req = JmapRequest::new(
501 vec!["urn:ietf:params:jmap:core".into()],
502 vec![
503 ("M/err".into(), json!({}), "c0".into()),
504 ("M/ok".into(), json!({}), "c1".into()),
505 ],
506 None,
507 );
508 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
509 assert_eq!(resp.method_responses.len(), 2);
510 assert_eq!(resp.method_responses[0].1["type"], "forbidden");
511 assert!(
512 resp.method_responses[1].1.get("type").is_none(),
513 "second call must succeed"
514 );
515 }
516
517 #[tokio::test]
523 async fn panicking_handler_returns_server_fail() {
524 let mut d: Dispatcher<String> = Dispatcher::new();
525 d.register("Panic/now", Arc::new(PanicHandler));
526 let req = single_call("Panic/now", json!({}), "c0");
527 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
528 assert_eq!(resp.method_responses.len(), 1);
529 let (_, args, _) = &resp.method_responses[0];
530 assert_eq!(
531 args["type"], "serverFail",
532 "panicking handler must produce serverFail"
533 );
534 }
535
536 #[tokio::test]
538 async fn panic_message_not_in_response() {
539 let mut d: Dispatcher<String> = Dispatcher::new();
540 d.register("Panic/now", Arc::new(PanicHandler));
541 let req = single_call("Panic/now", json!({}), "c0");
542 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
543 let (_, args, _) = &resp.method_responses[0];
544 if let Some(desc) = args["description"].as_str() {
545 assert!(
546 !desc.contains("deliberate test panic"),
547 "panic message must not leak into response description"
548 );
549 }
550 }
551
552 #[tokio::test]
558 async fn result_reference_resolved_before_dispatch() {
559 let captured = Arc::new(Mutex::new(None::<Value>));
560 let mut d: Dispatcher<String> = Dispatcher::new();
561 d.register(
562 "Foo/get",
563 Arc::new(EchoHandler(json!({"list": [{"id": "item-1"}]}))),
564 );
565 d.register(
566 "Bar/query",
567 Arc::new(CaptureArgsHandler(Arc::clone(&captured))),
568 );
569 let req = JmapRequest::new(
570 vec!["urn:ietf:params:jmap:core".into()],
571 vec![
572 ("Foo/get".into(), json!({}), "c0".into()),
573 (
574 "Bar/query".into(),
575 json!({"#ids": {"resultOf": "c0", "name": "Foo/get", "path": "/list/0/id"}}),
576 "c1".into(),
577 ),
578 ],
579 None,
580 );
581 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
582 assert_eq!(resp.method_responses.len(), 2);
583 assert!(
585 resp.method_responses[1].1.get("type").is_none(),
586 "Bar/query must succeed after ResultReference resolution"
587 );
588 let got = captured
590 .lock()
591 .unwrap()
592 .clone()
593 .expect("CaptureArgsHandler was not called");
594 assert_eq!(
595 got["ids"],
596 json!("item-1"),
597 "resolved value must be the string item-1"
598 );
599 assert!(
600 got.get("#ids").is_none(),
601 "#ids key must have been replaced"
602 );
603 }
604
605 #[tokio::test]
607 async fn result_reference_failure_stops_that_call() {
608 let d: Dispatcher<String> = Dispatcher::new();
609 let req = single_call(
610 "Foo/get",
611 json!({"#ids": {"resultOf": "nonexistent", "name": "Foo/get", "path": "/x"}}),
612 "c0",
613 );
614 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
615 assert_eq!(resp.method_responses.len(), 1);
616 let (_, args, _) = &resp.method_responses[0];
617 assert!(
618 args.get("type").is_some(),
619 "failed ResultReference must produce an error invocation"
620 );
621 }
622
623 #[tokio::test]
630 async fn created_ids_accumulated_from_set_response() {
631 let mut d: Dispatcher<String> = Dispatcher::new();
632 d.register(
633 "Foo/set",
634 Arc::new(EchoHandler(
635 json!({"created": {"client-1": {"id": "server-abc"}}}),
636 )),
637 );
638 let req = JmapRequest::new(
640 vec!["urn:ietf:params:jmap:core".into()],
641 vec![("Foo/set".into(), json!({}), "c0".into())],
642 Some(std::collections::HashMap::new()),
643 );
644 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
645 let ids = resp
646 .created_ids
647 .as_ref()
648 .expect("created_ids must be Some when client sent createdIds");
649 assert_eq!(
650 ids.get(&Id::from("client-1")),
651 Some(&Id::from("server-abc")),
652 "client-1 must map to server-abc"
653 );
654 }
655
656 #[tokio::test]
658 async fn created_ids_absent_when_no_set() {
659 let mut d: Dispatcher<String> = Dispatcher::new();
660 d.register("Foo/get", Arc::new(EchoHandler(json!({"list": []}))));
661 let req = single_call("Foo/get", json!({}), "c0");
662 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
663 assert!(
664 resp.created_ids.is_none(),
665 "created_ids must be None when no /set call created objects"
666 );
667 }
668
669 #[tokio::test]
671 async fn created_ids_accumulated_across_multiple_set_calls() {
672 let mut d: Dispatcher<String> = Dispatcher::new();
673 d.register(
674 "A/set",
675 Arc::new(EchoHandler(json!({"created": {"cA": {"id": "sA"}}}))),
676 );
677 d.register(
678 "B/set",
679 Arc::new(EchoHandler(json!({"created": {"cB": {"id": "sB"}}}))),
680 );
681 let req = JmapRequest::new(
683 vec!["urn:ietf:params:jmap:core".into()],
684 vec![
685 ("A/set".into(), json!({}), "c0".into()),
686 ("B/set".into(), json!({}), "c1".into()),
687 ],
688 Some(std::collections::HashMap::new()),
689 );
690 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
691 let ids = resp
692 .created_ids
693 .as_ref()
694 .expect("created_ids must be Some when client sent createdIds");
695 assert_eq!(
696 ids.get(&Id::from("cA")),
697 Some(&Id::from("sA")),
698 "cA must be present"
699 );
700 assert_eq!(
701 ids.get(&Id::from("cB")),
702 Some(&Id::from("sB")),
703 "cB must be present"
704 );
705 }
706
707 #[tokio::test]
710 async fn created_ids_merges_with_pre_populated_map() {
711 let mut d: Dispatcher<String> = Dispatcher::new();
712 d.register(
713 "Foo/set",
714 Arc::new(EchoHandler(
715 json!({"created": {"client-new": {"id": "server-new"}}}),
716 )),
717 );
718 let mut initial = std::collections::HashMap::new();
720 initial.insert(Id::from("client-old"), Id::from("server-old"));
721 let req = JmapRequest::new(
722 vec!["urn:ietf:params:jmap:core".into()],
723 vec![("Foo/set".into(), json!({}), "c0".into())],
724 Some(initial),
725 );
726 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
727 let ids = resp
728 .created_ids
729 .as_ref()
730 .expect("created_ids must be Some when client sent createdIds");
731 assert_eq!(
732 ids.get(&Id::from("client-old")),
733 Some(&Id::from("server-old")),
734 "pre-populated entry must be preserved"
735 );
736 assert_eq!(
737 ids.get(&Id::from("client-new")),
738 Some(&Id::from("server-new")),
739 "new /set entry must be merged in"
740 );
741 }
742
743 #[tokio::test]
749 async fn caller_ctx_passed_to_handler() {
750 let captured = Arc::new(Mutex::new(None::<String>));
751 let mut d: Dispatcher<String> = Dispatcher::new();
752 d.register(
753 "Foo/get",
754 Arc::new(CaptureCallerHandler(Arc::clone(&captured))),
755 );
756 let req = single_call("Foo/get", json!({}), "c0");
757 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
758 assert!(
759 resp.method_responses[0].1.get("type").is_none(),
760 "must succeed"
761 );
762 let got = captured
763 .lock()
764 .unwrap()
765 .clone()
766 .expect("handler was not called");
767 assert_eq!(got, "alice", "caller must be passed through unchanged");
768 }
769
770 #[tokio::test]
772 async fn unit_caller_ctx_works() {
773 let mut d: Dispatcher<()> = Dispatcher::new();
774 d.register("Foo/get", Arc::new(EchoHandler(json!({"ok": true}))));
775 let req = single_call("Foo/get", json!({}), "c0");
776 let resp = d.dispatch(req, (), "s0".into()).await;
777 assert_eq!(resp.method_responses.len(), 1);
778 assert!(
779 resp.method_responses[0].1.get("type").is_none(),
780 "must succeed with () caller"
781 );
782 }
783
784 struct ExtraInvocationHandler;
793
794 impl JmapHandler<String> for ExtraInvocationHandler {
795 fn call(
796 &self,
797 _method: String,
798 _call_id: String,
799 _args: Value,
800 _caller: String,
801 ) -> HandlerFuture {
802 Box::pin(async move {
803 let primary = json!({"type": "primary"});
804 let extra: Vec<Invocation> = vec![(
805 "Extra/call".to_owned(),
806 json!({"type": "extra"}),
807 "x0".to_owned(),
808 )];
809 Ok((primary, extra))
810 })
811 }
812 }
813
814 #[tokio::test]
817 async fn extra_invocations_appended_after_primary() {
818 let mut d: Dispatcher<String> = Dispatcher::new();
819 d.register("Sub/set", Arc::new(ExtraInvocationHandler));
820 let req = single_call("Sub/set", json!({}), "c0");
821 let resp = d.dispatch(req, "alice".into(), "s0".into()).await;
822
823 assert_eq!(
824 resp.method_responses.len(),
825 2,
826 "primary + 1 extra = 2 total invocations"
827 );
828 assert_eq!(resp.method_responses[0].0, "Sub/set");
830 assert_eq!(resp.method_responses[0].2, "c0");
831 assert_eq!(resp.method_responses[0].1["type"], "primary");
832 assert_eq!(resp.method_responses[1].0, "Extra/call");
834 assert_eq!(resp.method_responses[1].2, "x0");
835 assert_eq!(resp.method_responses[1].1["type"], "extra");
836 }
837}