Skip to main content

vstorage/jmap/
storage.rs

1// Copyright 2025 Hugo Osvaldo Barrera
2//
3// SPDX-License-Identifier: EUPL-1.2
4
5//! A [`JmapStorage`] implements the Storage trait for JMAP servers.
6//!
7//! JMAP (JSON Meta Application Protocol) is defined in RFC 8620. This implementation
8//! supports contact management via the JMAP Contacts extension (RFC 9553).
9//!
10//! This module is only available when the `jmap` feature is enabled.
11
12use std::{collections::HashMap, str::FromStr, sync::Arc};
13
14use async_trait::async_trait;
15use calcard::{icalendar::ICalendar, jscalendar::JSCalendar};
16use http::{Request, Response};
17use hyper::body::Incoming;
18use libjmap::{ChangeStatus, ChangesResponse, JmapClient, calendar::Calendar};
19use log::{debug, info, trace};
20use serde_json::Value;
21use tokio::sync::Mutex;
22use tower::Service;
23
24use crate::{
25    CollectionId, ErrorKind, Etag, Href, ItemKind, Result,
26    base::{
27        Collection, CollectionChanges, CreateItemOptions, FetchedItem, Item, ItemVersion, Storage,
28    },
29    disco::{DiscoveredCollection, Discovery},
30    jmap::cache::{StateCache, StateChanges},
31    property::Property,
32};
33
34/// Builder for [`JmapStorage`].
35pub struct JmapStorageBuilder<C>
36where
37    C: Service<Request<String>, Response = Response<Incoming>> + Send + Sync + 'static,
38    C::Error: std::error::Error + Send + Sync,
39    C::Future: Send + Sync,
40{
41    client: JmapClient<C>,
42}
43
44impl<C> JmapStorageBuilder<C>
45where
46    C: Service<Request<String>, Response = Response<Incoming>> + Send + Sync + 'static,
47    C::Error: std::error::Error + Send + Sync,
48    C::Future: Send + Sync,
49{
50    /// Build the storage instance.
51    #[must_use]
52    pub fn build(self, item_kind: ItemKind) -> JmapStorage<C> {
53        if item_kind == ItemKind::AddressBook {
54            todo!("JMAP Contacts is not implemented");
55        }
56        JmapStorage {
57            inner: Arc::new(Mutex::new((self.client, StateCache::new()))),
58            item_kind,
59        }
60    }
61}
62
63/// Storage backed by a JMAP server.
64///
65/// A single storage represents a single JMAP server with a specific set of credentials.
66/// Focuses on calendar support using Icalendar format.
67///
68/// # Caveats
69///
70/// Write operations are serialized. This is necessary due to JMAP's concurrency model.
71/// Operations use `ifInState` to ensure the server is in the expected state. If more than
72/// one concurrent requests specify the same state (e.g., both with `ifInState=S1`), only
73/// one can succeed, and the others will fail with `stateMismatch`. This means operations
74/// MUST be serialized at the protocol level.
75///
76/// Additionally, JMAP states are global (server-wide) opaque strings. The cache tracks a
77/// linear history of state transitions. Concurrent operations would create a branching
78/// history that breaks cache invariants.
79///
80/// In future, we might have a function to apply multiple operations at once. This
81/// requires changes to the [`Storage`]  trait, but would negate the negative implications
82/// of this caveat.
83pub struct JmapStorage<C>
84where
85    C: Service<Request<String>, Response = Response<Incoming>> + Send + Sync + 'static,
86    C::Error: std::error::Error + Send + Sync,
87    C::Future: Send + Sync,
88{
89    inner: Arc<Mutex<(JmapClient<C>, StateCache)>>,
90    item_kind: ItemKind,
91}
92
93impl<C> JmapStorage<C>
94where
95    C: Service<Request<String>, Response = Response<Incoming>> + Send + Sync + 'static,
96    C::Error: std::error::Error + Send + Sync,
97    C::Future: Send + Sync,
98{
99    /// Create a new builder for this storage type.
100    #[must_use]
101    pub fn builder(client: JmapClient<C>) -> JmapStorageBuilder<C> {
102        JmapStorageBuilder { client }
103    }
104}
105
106fn href_for_item(collection_id: &str, object_id: &str) -> String {
107    format!("{collection_id}/{object_id}")
108}
109
110fn parse_item_href(href: &str) -> Result<(&str, &str)> {
111    href.split_once('/')
112        .ok_or_else(|| ErrorKind::InvalidInput.error("Invalid href format"))
113}
114
115fn jscalendar_to_icalendar(jscalendar: &Value) -> Result<Item> {
116    let jscalendar_str = jscalendar.to_string();
117    let jscalendar = JSCalendar::parse(&jscalendar_str)
118        .map_err(|e| ErrorKind::InvalidData.error(format!("Failed to parse JSCalendar: {e}")))?;
119    let icalendar = jscalendar
120        .into_icalendar()
121        .ok_or_else(|| ErrorKind::InvalidData.error("Failed to convert JSCalendar to iCalendar"))?;
122    Ok(Item::from(icalendar.to_string()))
123}
124
125fn icalendar_to_jscalendar(icalendar_str: &str) -> Result<Value> {
126    let icalendar = ICalendar::parse(icalendar_str)
127        .map_err(|e| ErrorKind::InvalidInput.error(format!("Failed to parse iCalendar: {e:?}")))?;
128    let jscalendar = icalendar.into_jscalendar();
129    serde_json::to_value(&jscalendar.0)
130        .map_err(|e| ErrorKind::Io.error(format!("Failed to serialize JSCalendar: {e}")))
131}
132
133impl<C> JmapStorage<C>
134where
135    C: Service<Request<String>, Response = Response<Incoming>> + Send + Sync + 'static,
136    C::Error: std::error::Error + Send + Sync,
137    C::Future: Send + Sync,
138{
139    /// If `err` is a `stateMismatch`, query the state cache to determine whether the
140    /// specific item actually changed and return the change status plus the current state.
141    ///
142    /// If `err` is any other error, it is returned as `ErrorKind::Io`.
143    async fn check_state_mismatch(
144        err: libjmap::error::Error,
145        client: &JmapClient<C>,
146        cache: &mut StateCache,
147        object_id: &str,
148        old_state: &str,
149    ) -> Result<(ChangeStatus, String)> {
150        if let libjmap::error::Error::ServerError { ref error_type, .. } = err
151            && error_type == "stateMismatch"
152        {
153            Self::cached_changed_since(client, cache, object_id, old_state).await
154        } else {
155            Err(ErrorKind::Io.error(err))
156        }
157    }
158
159    /// Check if an item has changed since a given state.
160    ///
161    /// First check the state cache to see if we can determine the change status without
162    /// making a network request. On cache miss, it falls back to querying the server for
163    /// changes and populates the cache with all results.
164    ///
165    /// Returns whether the specified item has changed and the latest known state.
166    async fn cached_changed_since(
167        client: &JmapClient<C>,
168        cache: &mut StateCache,
169        object_id: &str,
170        old_state: &str,
171    ) -> Result<(ChangeStatus, String)> {
172        if let Some(status) = cache.query_changes(old_state, object_id) {
173            // Cache hit. Return status and current state.
174            if let Some(latest_state) = cache.latest_state() {
175                info!(
176                    "State cache hit for object_id={object_id}, from_state={old_state}, status={status:?}"
177                );
178                return Ok((status, latest_state.to_string()));
179            }
180            // else: latest_state should always return Some(_).
181        }
182
183        // Cache miss.
184        info!("State cache miss for object_id={object_id}, from_state={old_state}");
185
186        let mut current_state = old_state.to_string();
187        loop {
188            let changes_response = client
189                .changes::<Calendar>(&current_state, Some(500))
190                .await
191                .map_err(|err| ErrorKind::Io.error(err))?;
192
193            let ChangesResponse {
194                new_state,
195                has_more_changes,
196                created,
197                updated,
198                destroyed,
199                ..
200            } = changes_response;
201
202            // Build and store StateChanges from the response.
203            let mut changes = StateChanges::new();
204            changes.created.extend(created);
205            changes.updated.extend(updated);
206            changes.destroyed.extend(destroyed);
207            cache.add_transition(current_state.clone(), new_state.clone(), changes);
208
209            if !has_more_changes {
210                let status = cache
211                    .query_changes(old_state, object_id)
212                    .expect("Cache should have the status after populating from changes");
213                return Ok((status, new_state));
214            }
215
216            current_state = new_state;
217        }
218    }
219}
220
221#[async_trait]
222impl<C> Storage for JmapStorage<C>
223where
224    C: Service<Request<String>, Response = Response<Incoming>> + Send + Sync + 'static,
225    C::Error: std::error::Error + Send + Sync,
226    C::Future: Send + Sync,
227{
228    fn item_kind(&self) -> ItemKind {
229        self.item_kind
230    }
231
232    async fn check(&self) -> Result<()> {
233        // Check connectivity and required capabilities by fetching session resource.
234        let (client, _cache) = &*self.inner.lock().await;
235        let session = client.get_session_resource().await.map_err(|e| {
236            ErrorKind::Unavailable.error(format!("Failed to fetch JMAP session: {e}"))
237        })?;
238
239        // TODO: if item_kind is address_book, check the other capability.
240        if !session.supports_calendars() {
241            return Err(ErrorKind::Unsupported.error("JMAP server does not support calendars"));
242        }
243
244        Ok(())
245    }
246
247    async fn discover_collections(&self) -> Result<Discovery> {
248        let (client, _cache) = &*self.inner.lock().await;
249        let calendars = client
250            .get_collections::<Calendar>()
251            .await
252            .map_err(|err| ErrorKind::Io.error(err))?; // TODO: check ErrorKind
253        debug!("Discovered {} collections.", calendars.len());
254
255        let collections = calendars
256            .into_iter()
257            .map(|ab| {
258                // TODO: check ErrorKind
259                let id = CollectionId::from_str(&ab.id).map_err(|e| {
260                    ErrorKind::InvalidInput
261                        .error(format!("Invalid collection id '{}': {}", ab.id, e))
262                })?;
263                Ok(DiscoveredCollection::new(ab.id.clone(), id))
264            })
265            .collect::<Result<Vec<_>>>()?;
266
267        Discovery::try_from(collections).map_err(|e| ErrorKind::InvalidData.error(e))
268    }
269
270    async fn create_collection(&self, _href: &str) -> Result<Collection> {
271        Err(ErrorKind::Unsupported
272            .error("JMAP does not support creating collections with specified hrefs"))
273    }
274
275    async fn delete_collection(&self, href: &str) -> Result<()> {
276        // First check if the collection is empty.
277        let items = self.list_items(href).await?;
278        if !items.is_empty() {
279            return Err(ErrorKind::CollectionNotEmpty.into()); // check ErrorKind
280        }
281
282        // TODO: should use Etag to avoid races.
283        // (this is a documented caveat anyway; other storages have the same limitation).
284        let (client, _cache) = &*self.inner.lock().await;
285        client
286            .delete_collection::<Calendar>(href, None)
287            .await
288            .map_err(|err| ErrorKind::Io.error(err))?; // check ErrorKind
289
290        Ok(())
291    }
292
293    async fn list_items(&self, collection_href: &str) -> Result<Vec<ItemVersion>> {
294        let (client, _cache) = &*self.inner.lock().await;
295        let records = client
296            .get_records::<Calendar>(collection_href)
297            .await
298            .map_err(|err| ErrorKind::Io.error(err))?; // check ErrorKind
299
300        Ok(records
301            .into_iter()
302            .map(|record| {
303                ItemVersion::new(
304                    href_for_item(collection_href, &record.id),
305                    Etag::from(record.state),
306                )
307            })
308            .collect())
309    }
310
311    async fn changed_since(
312        &self,
313        collection: &str,
314        since_state: Option<&str>,
315    ) -> Result<CollectionChanges> {
316        let (client, cache) = &mut *self.inner.lock().await;
317
318        match since_state {
319            None => {
320                let records = client
321                    .get_records::<Calendar>(collection)
322                    .await
323                    .map_err(|err| ErrorKind::Io.error(err))?;
324
325                // All records share the same collection state.
326                let new_state = records
327                    .first()
328                    .map(|r| Some(r.state.clone()))
329                    .unwrap_or_default();
330
331                Ok(CollectionChanges {
332                    new_state,
333                    changed: records
334                        .into_iter()
335                        .map(|r| href_for_item(collection, &r.id))
336                        .collect(),
337                    deleted: Vec::new(),
338                })
339            }
340            Some(since_state) => {
341                let mut current_state = since_state.to_string();
342                let mut all_changed = Vec::new();
343                let mut all_deleted = Vec::new();
344
345                let new_state = loop {
346                    let changes_response = client
347                        .changes::<Calendar>(&current_state, None)
348                        .await
349                        .map_err(|err| ErrorKind::Io.error(err))?;
350
351                    let ChangesResponse {
352                        new_state,
353                        has_more_changes,
354                        created,
355                        updated,
356                        destroyed,
357                        ..
358                    } = changes_response;
359
360                    // Record changes in state transition cache.
361                    let mut changes = StateChanges::new();
362                    changes.created.extend(created.iter().cloned());
363                    changes.updated.extend(updated.iter().cloned());
364                    changes.destroyed.extend(destroyed.iter().cloned());
365                    cache.add_transition(current_state.clone(), new_state.clone(), changes);
366
367                    // Accumulate changes to be returned.
368                    all_changed.extend(created);
369                    all_changed.extend(updated);
370                    all_deleted.extend(destroyed);
371
372                    if !has_more_changes {
373                        break Some(new_state);
374                    }
375                    current_state = new_state;
376                };
377
378                Ok(CollectionChanges {
379                    new_state,
380                    changed: all_changed
381                        .into_iter()
382                        .map(|id| href_for_item(collection, &id))
383                        .collect(),
384                    deleted: all_deleted
385                        .into_iter()
386                        .map(|id| href_for_item(collection, &id))
387                        .collect(),
388                })
389            }
390        }
391    }
392
393    async fn get_item(&self, href: &str) -> Result<(Item, Etag)> {
394        let (collection_id, object_id) = parse_item_href(href)?;
395
396        trace!("Getting JMAP records for collection {collection_id}.");
397        let (client, _cache) = &*self.inner.lock().await;
398        let records = client
399            .get_records::<Calendar>(collection_id)
400            .await
401            .map_err(|err| ErrorKind::Io.error(err))?; // TODO: check ErrorKind
402
403        let record = records
404            .into_iter()
405            .find(|r| r.id == object_id)
406            .ok_or_else(|| ErrorKind::DoesNotExist.error("Item not found"))?;
407
408        let item = jscalendar_to_icalendar(&record.data)?;
409        Ok((item, Etag::from(record.state)))
410    }
411
412    async fn get_many_items(&self, hrefs: &[&str]) -> Result<Vec<FetchedItem>> {
413        if hrefs.is_empty() {
414            return Ok(Vec::new());
415        }
416
417        // Group hrefs by collection.
418        let mut collections: HashMap<&str, Vec<&str>> = HashMap::new();
419        for href in hrefs {
420            let (collection_id, object_id) = parse_item_href(href)?;
421            collections
422                .entry(collection_id)
423                .or_default()
424                .push(object_id);
425        }
426
427        let mut fetched_items = Vec::new();
428        let (client, _cache) = &*self.inner.lock().await;
429
430        // TODO: get_records should be usable without a collection_id.
431        //       this function is feasible with a single network call.
432        //       (with some improvements to libjmap).
433        for (collection_id, object_ids) in collections {
434            let records = client
435                .get_records::<Calendar>(collection_id)
436                .await
437                .map_err(|err| ErrorKind::Io.error(err))?; // TODO: check ErrorKind
438
439            for record in records {
440                if object_ids.contains(&record.id.as_str()) {
441                    let item = jscalendar_to_icalendar(&record.data)?;
442                    fetched_items.push(FetchedItem {
443                        href: href_for_item(collection_id, &record.id),
444                        item,
445                        etag: Etag::from(record.state),
446                    });
447                }
448            }
449        }
450
451        Ok(fetched_items)
452    }
453
454    async fn create_item(
455        &self,
456        collection: &str,
457        item: &Item,
458        _opts: CreateItemOptions,
459    ) -> Result<ItemVersion> {
460        let (client, cache) = &mut *self.inner.lock().await;
461
462        // Get current state before creating (if available in cache).
463        let old_state = cache.latest_state().map(String::from);
464
465        let json_jscalendar = icalendar_to_jscalendar(item.as_str())?;
466        let record = client
467            .create_record::<Calendar>(collection, &json_jscalendar, old_state.as_deref())
468            .await
469            .map_err(|err| ErrorKind::Io.error(err))?; // TODO: check ErrorKind
470        trace!(
471            "Created JSCalendar collection={}, id={}.",
472            collection, &record.id
473        );
474
475        if let Some(old_state) = old_state {
476            cache.add_transition(
477                old_state,
478                record.state.clone(),
479                StateChanges::created(record.id.clone()),
480            );
481        } else {
482            // First operation. Initialize cache with this state.
483            cache.add_transition(
484                record.state.clone(), // Use new state as both old and new // TODO: review
485                record.state.clone(),
486                StateChanges::created(record.id.clone()),
487            );
488        }
489
490        Ok(ItemVersion::new(
491            href_for_item(collection, &record.id),
492            Etag::from(record.state),
493        ))
494    }
495
496    async fn update_item(&self, href: &str, etag: &Etag, item: &Item) -> Result<Etag> {
497        let (collection_id, object_id) = parse_item_href(href)?;
498        let json_jscalendar = icalendar_to_jscalendar(item.as_str())?;
499
500        let (client, cache) = &mut *self.inner.lock().await;
501
502        let result = client
503            .update_record::<Calendar>(object_id, collection_id, &json_jscalendar, Some(&etag.0))
504            .await;
505
506        match result {
507            Ok(record) => {
508                cache.add_transition(
509                    etag.0.clone(),
510                    record.state.clone(),
511                    StateChanges::updated(object_id.to_string()),
512                );
513
514                Ok(Etag::from(record.state))
515            }
516            Err(err) => {
517                let (status, new_state) =
518                    Self::check_state_mismatch(err, client, cache, object_id, &etag.0).await?;
519
520                match status {
521                    ChangeStatus::NotChanged => {
522                        let record = client
523                            .update_record::<Calendar>(
524                                object_id,
525                                collection_id,
526                                &json_jscalendar,
527                                Some(&new_state),
528                            )
529                            .await
530                            .map_err(|err| ErrorKind::Io.error(err))?;
531
532                        cache.add_transition(
533                            new_state,
534                            record.state.clone(),
535                            StateChanges::updated(object_id.to_string()),
536                        );
537
538                        Ok(Etag::from(record.state))
539                    }
540                    ChangeStatus::Changed | ChangeStatus::Deleted => {
541                        Err(ErrorKind::InvalidData
542                            .error("Etag does not match; item has been modified"))
543                    }
544                }
545            }
546        }
547    }
548
549    async fn delete_item(&self, href: &str, etag: &Etag) -> Result<()> {
550        let (_collection_id, object_id) = parse_item_href(href)?;
551
552        let (client, cache) = &mut *self.inner.lock().await;
553
554        let result = client
555            .delete_record::<Calendar>(object_id, Some(&etag.0))
556            .await;
557
558        match result {
559            Ok(new_state) => {
560                // Record the state transition in cache
561                cache.add_transition(
562                    etag.0.clone(),
563                    new_state,
564                    StateChanges::destroyed(object_id.to_string()),
565                );
566
567                Ok(())
568            }
569            Err(err) => {
570                let (status, new_state) =
571                    Self::check_state_mismatch(err, client, cache, object_id, &etag.0).await?;
572
573                match status {
574                    ChangeStatus::NotChanged => {
575                        let final_state = client
576                            .delete_record::<Calendar>(object_id, Some(&new_state))
577                            .await
578                            .map_err(|err| ErrorKind::Io.error(err))?;
579
580                        cache.add_transition(
581                            new_state,
582                            final_state,
583                            StateChanges::destroyed(object_id.to_string()),
584                        );
585
586                        Ok(())
587                    }
588                    ChangeStatus::Changed => {
589                        Err(ErrorKind::InvalidData
590                            .error("Etag does not match; item has been modified"))
591                    }
592                    ChangeStatus::Deleted => Ok(()),
593                }
594            }
595        }
596    }
597
598    async fn get_property(&self, href: &str, property: Property) -> Result<Option<String>> {
599        if !property.is_valid_for(self.item_kind) {
600            return Err(ErrorKind::InvalidInput.error(format!(
601                "property '{property}' not valid for {:?}",
602                self.item_kind
603            )));
604        }
605
606        let (client, _cache) = &*self.inner.lock().await;
607        let calendars = client
608            .get_collections::<Calendar>()
609            .await
610            .map_err(|err| ErrorKind::Io.error(err))?;
611
612        let cal = calendars.into_iter().find(|cal| cal.id == href);
613
614        Ok(match property {
615            Property::DisplayName => cal.map(|cal| cal.name),
616            Property::Description => cal.and_then(|cal| cal.description),
617            Property::Colour => cal.and_then(|cal| cal.color),
618            Property::Order => cal.map(|cal| cal.sort_order.to_string()),
619        })
620    }
621
622    async fn set_property(&self, _href: &str, _property: Property, _value: &str) -> Result<()> {
623        Err(ErrorKind::Unsupported.error("Not yet implemented"))
624    }
625
626    async fn unset_property(&self, _href: &str, _property: Property) -> Result<()> {
627        Err(ErrorKind::Unsupported.error("Not yet implemented"))
628    }
629
630    fn href_for_collection_id(&self, id: &CollectionId) -> Result<Href> {
631        Ok(id.to_string())
632    }
633}