1use std::{collections::HashMap, str::FromStr, sync::Arc};
13
14use async_trait::async_trait;
15use calcard::{icalendar::ICalendar, jscalendar::JSCalendar};
16use http::{Request, Response};
17use hyper::body::Incoming;
18use libjmap::{ChangeStatus, ChangesResponse, JmapClient, calendar::Calendar};
19use log::{debug, info, trace};
20use serde_json::Value;
21use tokio::sync::Mutex;
22use tower::Service;
23
24use crate::{
25 CollectionId, ErrorKind, Etag, Href, ItemKind, Result,
26 base::{
27 Collection, CollectionChanges, CreateItemOptions, FetchedItem, Item, ItemVersion, Storage,
28 },
29 disco::{DiscoveredCollection, Discovery},
30 jmap::cache::{StateCache, StateChanges},
31 property::Property,
32};
33
34pub struct JmapStorageBuilder<C>
36where
37 C: Service<Request<String>, Response = Response<Incoming>> + Send + Sync + 'static,
38 C::Error: std::error::Error + Send + Sync,
39 C::Future: Send + Sync,
40{
41 client: JmapClient<C>,
42}
43
44impl<C> JmapStorageBuilder<C>
45where
46 C: Service<Request<String>, Response = Response<Incoming>> + Send + Sync + 'static,
47 C::Error: std::error::Error + Send + Sync,
48 C::Future: Send + Sync,
49{
50 #[must_use]
52 pub fn build(self, item_kind: ItemKind) -> JmapStorage<C> {
53 if item_kind == ItemKind::AddressBook {
54 todo!("JMAP Contacts is not implemented");
55 }
56 JmapStorage {
57 inner: Arc::new(Mutex::new((self.client, StateCache::new()))),
58 item_kind,
59 }
60 }
61}
62
63pub struct JmapStorage<C>
84where
85 C: Service<Request<String>, Response = Response<Incoming>> + Send + Sync + 'static,
86 C::Error: std::error::Error + Send + Sync,
87 C::Future: Send + Sync,
88{
89 inner: Arc<Mutex<(JmapClient<C>, StateCache)>>,
90 item_kind: ItemKind,
91}
92
93impl<C> JmapStorage<C>
94where
95 C: Service<Request<String>, Response = Response<Incoming>> + Send + Sync + 'static,
96 C::Error: std::error::Error + Send + Sync,
97 C::Future: Send + Sync,
98{
99 #[must_use]
101 pub fn builder(client: JmapClient<C>) -> JmapStorageBuilder<C> {
102 JmapStorageBuilder { client }
103 }
104}
105
106fn href_for_item(collection_id: &str, object_id: &str) -> String {
107 format!("{collection_id}/{object_id}")
108}
109
110fn parse_item_href(href: &str) -> Result<(&str, &str)> {
111 href.split_once('/')
112 .ok_or_else(|| ErrorKind::InvalidInput.error("Invalid href format"))
113}
114
115fn jscalendar_to_icalendar(jscalendar: &Value) -> Result<Item> {
116 let jscalendar_str = jscalendar.to_string();
117 let jscalendar = JSCalendar::parse(&jscalendar_str)
118 .map_err(|e| ErrorKind::InvalidData.error(format!("Failed to parse JSCalendar: {e}")))?;
119 let icalendar = jscalendar
120 .into_icalendar()
121 .ok_or_else(|| ErrorKind::InvalidData.error("Failed to convert JSCalendar to iCalendar"))?;
122 Ok(Item::from(icalendar.to_string()))
123}
124
125fn icalendar_to_jscalendar(icalendar_str: &str) -> Result<Value> {
126 let icalendar = ICalendar::parse(icalendar_str)
127 .map_err(|e| ErrorKind::InvalidInput.error(format!("Failed to parse iCalendar: {e:?}")))?;
128 let jscalendar = icalendar.into_jscalendar();
129 serde_json::to_value(&jscalendar.0)
130 .map_err(|e| ErrorKind::Io.error(format!("Failed to serialize JSCalendar: {e}")))
131}
132
133impl<C> JmapStorage<C>
134where
135 C: Service<Request<String>, Response = Response<Incoming>> + Send + Sync + 'static,
136 C::Error: std::error::Error + Send + Sync,
137 C::Future: Send + Sync,
138{
139 async fn check_state_mismatch(
144 err: libjmap::error::Error,
145 client: &JmapClient<C>,
146 cache: &mut StateCache,
147 object_id: &str,
148 old_state: &str,
149 ) -> Result<(ChangeStatus, String)> {
150 if let libjmap::error::Error::ServerError { ref error_type, .. } = err
151 && error_type == "stateMismatch"
152 {
153 Self::cached_changed_since(client, cache, object_id, old_state).await
154 } else {
155 Err(ErrorKind::Io.error(err))
156 }
157 }
158
159 async fn cached_changed_since(
167 client: &JmapClient<C>,
168 cache: &mut StateCache,
169 object_id: &str,
170 old_state: &str,
171 ) -> Result<(ChangeStatus, String)> {
172 if let Some(status) = cache.query_changes(old_state, object_id) {
173 if let Some(latest_state) = cache.latest_state() {
175 info!(
176 "State cache hit for object_id={object_id}, from_state={old_state}, status={status:?}"
177 );
178 return Ok((status, latest_state.to_string()));
179 }
180 }
182
183 info!("State cache miss for object_id={object_id}, from_state={old_state}");
185
186 let mut current_state = old_state.to_string();
187 loop {
188 let changes_response = client
189 .changes::<Calendar>(¤t_state, Some(500))
190 .await
191 .map_err(|err| ErrorKind::Io.error(err))?;
192
193 let ChangesResponse {
194 new_state,
195 has_more_changes,
196 created,
197 updated,
198 destroyed,
199 ..
200 } = changes_response;
201
202 let mut changes = StateChanges::new();
204 changes.created.extend(created);
205 changes.updated.extend(updated);
206 changes.destroyed.extend(destroyed);
207 cache.add_transition(current_state.clone(), new_state.clone(), changes);
208
209 if !has_more_changes {
210 let status = cache
211 .query_changes(old_state, object_id)
212 .expect("Cache should have the status after populating from changes");
213 return Ok((status, new_state));
214 }
215
216 current_state = new_state;
217 }
218 }
219}
220
221#[async_trait]
222impl<C> Storage for JmapStorage<C>
223where
224 C: Service<Request<String>, Response = Response<Incoming>> + Send + Sync + 'static,
225 C::Error: std::error::Error + Send + Sync,
226 C::Future: Send + Sync,
227{
228 fn item_kind(&self) -> ItemKind {
229 self.item_kind
230 }
231
232 async fn check(&self) -> Result<()> {
233 let (client, _cache) = &*self.inner.lock().await;
235 let session = client.get_session_resource().await.map_err(|e| {
236 ErrorKind::Unavailable.error(format!("Failed to fetch JMAP session: {e}"))
237 })?;
238
239 if !session.supports_calendars() {
241 return Err(ErrorKind::Unsupported.error("JMAP server does not support calendars"));
242 }
243
244 Ok(())
245 }
246
247 async fn discover_collections(&self) -> Result<Discovery> {
248 let (client, _cache) = &*self.inner.lock().await;
249 let calendars = client
250 .get_collections::<Calendar>()
251 .await
252 .map_err(|err| ErrorKind::Io.error(err))?; debug!("Discovered {} collections.", calendars.len());
254
255 let collections = calendars
256 .into_iter()
257 .map(|ab| {
258 let id = CollectionId::from_str(&ab.id).map_err(|e| {
260 ErrorKind::InvalidInput
261 .error(format!("Invalid collection id '{}': {}", ab.id, e))
262 })?;
263 Ok(DiscoveredCollection::new(ab.id.clone(), id))
264 })
265 .collect::<Result<Vec<_>>>()?;
266
267 Discovery::try_from(collections).map_err(|e| ErrorKind::InvalidData.error(e))
268 }
269
270 async fn create_collection(&self, _href: &str) -> Result<Collection> {
271 Err(ErrorKind::Unsupported
272 .error("JMAP does not support creating collections with specified hrefs"))
273 }
274
275 async fn delete_collection(&self, href: &str) -> Result<()> {
276 let items = self.list_items(href).await?;
278 if !items.is_empty() {
279 return Err(ErrorKind::CollectionNotEmpty.into()); }
281
282 let (client, _cache) = &*self.inner.lock().await;
285 client
286 .delete_collection::<Calendar>(href, None)
287 .await
288 .map_err(|err| ErrorKind::Io.error(err))?; Ok(())
291 }
292
293 async fn list_items(&self, collection_href: &str) -> Result<Vec<ItemVersion>> {
294 let (client, _cache) = &*self.inner.lock().await;
295 let records = client
296 .get_records::<Calendar>(collection_href)
297 .await
298 .map_err(|err| ErrorKind::Io.error(err))?; Ok(records
301 .into_iter()
302 .map(|record| {
303 ItemVersion::new(
304 href_for_item(collection_href, &record.id),
305 Etag::from(record.state),
306 )
307 })
308 .collect())
309 }
310
311 async fn changed_since(
312 &self,
313 collection: &str,
314 since_state: Option<&str>,
315 ) -> Result<CollectionChanges> {
316 let (client, cache) = &mut *self.inner.lock().await;
317
318 match since_state {
319 None => {
320 let records = client
321 .get_records::<Calendar>(collection)
322 .await
323 .map_err(|err| ErrorKind::Io.error(err))?;
324
325 let new_state = records
327 .first()
328 .map(|r| Some(r.state.clone()))
329 .unwrap_or_default();
330
331 Ok(CollectionChanges {
332 new_state,
333 changed: records
334 .into_iter()
335 .map(|r| href_for_item(collection, &r.id))
336 .collect(),
337 deleted: Vec::new(),
338 })
339 }
340 Some(since_state) => {
341 let mut current_state = since_state.to_string();
342 let mut all_changed = Vec::new();
343 let mut all_deleted = Vec::new();
344
345 let new_state = loop {
346 let changes_response = client
347 .changes::<Calendar>(¤t_state, None)
348 .await
349 .map_err(|err| ErrorKind::Io.error(err))?;
350
351 let ChangesResponse {
352 new_state,
353 has_more_changes,
354 created,
355 updated,
356 destroyed,
357 ..
358 } = changes_response;
359
360 let mut changes = StateChanges::new();
362 changes.created.extend(created.iter().cloned());
363 changes.updated.extend(updated.iter().cloned());
364 changes.destroyed.extend(destroyed.iter().cloned());
365 cache.add_transition(current_state.clone(), new_state.clone(), changes);
366
367 all_changed.extend(created);
369 all_changed.extend(updated);
370 all_deleted.extend(destroyed);
371
372 if !has_more_changes {
373 break Some(new_state);
374 }
375 current_state = new_state;
376 };
377
378 Ok(CollectionChanges {
379 new_state,
380 changed: all_changed
381 .into_iter()
382 .map(|id| href_for_item(collection, &id))
383 .collect(),
384 deleted: all_deleted
385 .into_iter()
386 .map(|id| href_for_item(collection, &id))
387 .collect(),
388 })
389 }
390 }
391 }
392
393 async fn get_item(&self, href: &str) -> Result<(Item, Etag)> {
394 let (collection_id, object_id) = parse_item_href(href)?;
395
396 trace!("Getting JMAP records for collection {collection_id}.");
397 let (client, _cache) = &*self.inner.lock().await;
398 let records = client
399 .get_records::<Calendar>(collection_id)
400 .await
401 .map_err(|err| ErrorKind::Io.error(err))?; let record = records
404 .into_iter()
405 .find(|r| r.id == object_id)
406 .ok_or_else(|| ErrorKind::DoesNotExist.error("Item not found"))?;
407
408 let item = jscalendar_to_icalendar(&record.data)?;
409 Ok((item, Etag::from(record.state)))
410 }
411
412 async fn get_many_items(&self, hrefs: &[&str]) -> Result<Vec<FetchedItem>> {
413 if hrefs.is_empty() {
414 return Ok(Vec::new());
415 }
416
417 let mut collections: HashMap<&str, Vec<&str>> = HashMap::new();
419 for href in hrefs {
420 let (collection_id, object_id) = parse_item_href(href)?;
421 collections
422 .entry(collection_id)
423 .or_default()
424 .push(object_id);
425 }
426
427 let mut fetched_items = Vec::new();
428 let (client, _cache) = &*self.inner.lock().await;
429
430 for (collection_id, object_ids) in collections {
434 let records = client
435 .get_records::<Calendar>(collection_id)
436 .await
437 .map_err(|err| ErrorKind::Io.error(err))?; for record in records {
440 if object_ids.contains(&record.id.as_str()) {
441 let item = jscalendar_to_icalendar(&record.data)?;
442 fetched_items.push(FetchedItem {
443 href: href_for_item(collection_id, &record.id),
444 item,
445 etag: Etag::from(record.state),
446 });
447 }
448 }
449 }
450
451 Ok(fetched_items)
452 }
453
454 async fn create_item(
455 &self,
456 collection: &str,
457 item: &Item,
458 _opts: CreateItemOptions,
459 ) -> Result<ItemVersion> {
460 let (client, cache) = &mut *self.inner.lock().await;
461
462 let old_state = cache.latest_state().map(String::from);
464
465 let json_jscalendar = icalendar_to_jscalendar(item.as_str())?;
466 let record = client
467 .create_record::<Calendar>(collection, &json_jscalendar, old_state.as_deref())
468 .await
469 .map_err(|err| ErrorKind::Io.error(err))?; trace!(
471 "Created JSCalendar collection={}, id={}.",
472 collection, &record.id
473 );
474
475 if let Some(old_state) = old_state {
476 cache.add_transition(
477 old_state,
478 record.state.clone(),
479 StateChanges::created(record.id.clone()),
480 );
481 } else {
482 cache.add_transition(
484 record.state.clone(), record.state.clone(),
486 StateChanges::created(record.id.clone()),
487 );
488 }
489
490 Ok(ItemVersion::new(
491 href_for_item(collection, &record.id),
492 Etag::from(record.state),
493 ))
494 }
495
496 async fn update_item(&self, href: &str, etag: &Etag, item: &Item) -> Result<Etag> {
497 let (collection_id, object_id) = parse_item_href(href)?;
498 let json_jscalendar = icalendar_to_jscalendar(item.as_str())?;
499
500 let (client, cache) = &mut *self.inner.lock().await;
501
502 let result = client
503 .update_record::<Calendar>(object_id, collection_id, &json_jscalendar, Some(&etag.0))
504 .await;
505
506 match result {
507 Ok(record) => {
508 cache.add_transition(
509 etag.0.clone(),
510 record.state.clone(),
511 StateChanges::updated(object_id.to_string()),
512 );
513
514 Ok(Etag::from(record.state))
515 }
516 Err(err) => {
517 let (status, new_state) =
518 Self::check_state_mismatch(err, client, cache, object_id, &etag.0).await?;
519
520 match status {
521 ChangeStatus::NotChanged => {
522 let record = client
523 .update_record::<Calendar>(
524 object_id,
525 collection_id,
526 &json_jscalendar,
527 Some(&new_state),
528 )
529 .await
530 .map_err(|err| ErrorKind::Io.error(err))?;
531
532 cache.add_transition(
533 new_state,
534 record.state.clone(),
535 StateChanges::updated(object_id.to_string()),
536 );
537
538 Ok(Etag::from(record.state))
539 }
540 ChangeStatus::Changed | ChangeStatus::Deleted => {
541 Err(ErrorKind::InvalidData
542 .error("Etag does not match; item has been modified"))
543 }
544 }
545 }
546 }
547 }
548
549 async fn delete_item(&self, href: &str, etag: &Etag) -> Result<()> {
550 let (_collection_id, object_id) = parse_item_href(href)?;
551
552 let (client, cache) = &mut *self.inner.lock().await;
553
554 let result = client
555 .delete_record::<Calendar>(object_id, Some(&etag.0))
556 .await;
557
558 match result {
559 Ok(new_state) => {
560 cache.add_transition(
562 etag.0.clone(),
563 new_state,
564 StateChanges::destroyed(object_id.to_string()),
565 );
566
567 Ok(())
568 }
569 Err(err) => {
570 let (status, new_state) =
571 Self::check_state_mismatch(err, client, cache, object_id, &etag.0).await?;
572
573 match status {
574 ChangeStatus::NotChanged => {
575 let final_state = client
576 .delete_record::<Calendar>(object_id, Some(&new_state))
577 .await
578 .map_err(|err| ErrorKind::Io.error(err))?;
579
580 cache.add_transition(
581 new_state,
582 final_state,
583 StateChanges::destroyed(object_id.to_string()),
584 );
585
586 Ok(())
587 }
588 ChangeStatus::Changed => {
589 Err(ErrorKind::InvalidData
590 .error("Etag does not match; item has been modified"))
591 }
592 ChangeStatus::Deleted => Ok(()),
593 }
594 }
595 }
596 }
597
598 async fn get_property(&self, href: &str, property: Property) -> Result<Option<String>> {
599 if !property.is_valid_for(self.item_kind) {
600 return Err(ErrorKind::InvalidInput.error(format!(
601 "property '{property}' not valid for {:?}",
602 self.item_kind
603 )));
604 }
605
606 let (client, _cache) = &*self.inner.lock().await;
607 let calendars = client
608 .get_collections::<Calendar>()
609 .await
610 .map_err(|err| ErrorKind::Io.error(err))?;
611
612 let cal = calendars.into_iter().find(|cal| cal.id == href);
613
614 Ok(match property {
615 Property::DisplayName => cal.map(|cal| cal.name),
616 Property::Description => cal.and_then(|cal| cal.description),
617 Property::Colour => cal.and_then(|cal| cal.color),
618 Property::Order => cal.map(|cal| cal.sort_order.to_string()),
619 })
620 }
621
622 async fn set_property(&self, _href: &str, _property: Property, _value: &str) -> Result<()> {
623 Err(ErrorKind::Unsupported.error("Not yet implemented"))
624 }
625
626 async fn unset_property(&self, _href: &str, _property: Property) -> Result<()> {
627 Err(ErrorKind::Unsupported.error("Not yet implemented"))
628 }
629
630 fn href_for_collection_id(&self, id: &CollectionId) -> Result<Href> {
631 Ok(id.to_string())
632 }
633}