use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use async_trait::async_trait;
use calcard::{icalendar::ICalendar, jscalendar::JSCalendar};
use http::{Request, Response};
use hyper::body::Incoming;
use libjmap::{ChangeStatus, ChangesResponse, JmapClient, calendar::Calendar};
use log::{debug, info, trace};
use serde_json::Value;
use tokio::sync::Mutex;
use tower::Service;
use super::cache::StateCache;
use crate::{
CollectionId, Error, ErrorKind, Etag, Href, ItemKind, Result,
base::{
Collection, CollectionChanges, CreateItemOptions, FetchedItem, Item, ItemVersion, Property,
Storage,
},
calendar::CalendarProperty,
disco::{DiscoveredCollection, Discovery},
jmap::cache::StateChanges,
};
pub struct JmapStorageBuilder<C>
where
C: Service<Request<String>, Response = Response<Incoming>> + Send + Sync + 'static,
C::Error: std::error::Error + Send + Sync,
C::Future: Send + Sync,
{
client: JmapClient<C>,
}
impl<C> JmapStorageBuilder<C>
where
C: Service<Request<String>, Response = Response<Incoming>> + Send + Sync + 'static,
C::Error: std::error::Error + Send + Sync,
C::Future: Send + Sync,
{
#[must_use]
pub fn build(self, item_kind: ItemKind) -> JmapStorage<C> {
if item_kind == ItemKind::AddressBook {
todo!("JMAP Contacts is not implemented");
}
JmapStorage {
inner: Arc::new(Mutex::new((self.client, StateCache::new()))),
item_kind,
}
}
}
pub struct JmapStorage<C>
where
C: Service<Request<String>, Response = Response<Incoming>> + Send + Sync + 'static,
C::Error: std::error::Error + Send + Sync,
C::Future: Send + Sync,
{
inner: Arc<Mutex<(JmapClient<C>, StateCache)>>,
item_kind: ItemKind,
}
impl<C> JmapStorage<C>
where
C: Service<Request<String>, Response = Response<Incoming>> + Send + Sync + 'static,
C::Error: std::error::Error + Send + Sync,
C::Future: Send + Sync,
{
#[must_use]
pub fn builder(client: JmapClient<C>) -> JmapStorageBuilder<C> {
JmapStorageBuilder { client }
}
}
fn href_for_item(collection_id: &str, object_id: &str) -> String {
format!("{collection_id}/{object_id}")
}
fn parse_item_href(href: &str) -> Result<(&str, &str)> {
href.split_once('/')
.ok_or_else(|| Error::new(ErrorKind::InvalidInput, "Invalid href format"))
}
fn jscalendar_to_icalendar(jscalendar: &Value) -> Result<Item> {
let jscalendar_str = jscalendar.to_string();
let jscalendar = JSCalendar::parse(&jscalendar_str).map_err(|e| {
Error::new(
ErrorKind::InvalidData,
format!("Failed to parse JSCalendar: {e}"),
)
})?;
let icalendar = jscalendar.into_icalendar().ok_or_else(|| {
Error::new(
ErrorKind::InvalidData,
"Failed to convert JSCalendar to iCalendar",
)
})?;
Ok(Item::from(icalendar.to_string()))
}
fn icalendar_to_jscalendar(icalendar_str: &str) -> Result<Value> {
let icalendar = ICalendar::parse(icalendar_str).map_err(|e| {
Error::new(
ErrorKind::InvalidInput,
format!("Failed to parse iCalendar: {e:?}"),
)
})?;
let jscalendar = icalendar.into_jscalendar();
serde_json::to_value(&jscalendar.0).map_err(|e| {
Error::new(
ErrorKind::Io,
format!("Failed to serialize JSCalendar: {e}"),
)
})
}
impl<C> JmapStorage<C>
where
C: Service<Request<String>, Response = Response<Incoming>> + Send + Sync + 'static,
C::Error: std::error::Error + Send + Sync,
C::Future: Send + Sync,
{
async fn cached_changed_since(
client: &JmapClient<C>,
cache: &mut StateCache,
object_id: &str,
old_state: &str,
) -> Result<(ChangeStatus, String)> {
if let Some(status) = cache.query_changes(old_state, object_id) {
if let Some(latest_state) = cache.latest_state() {
info!(
"State cache hit for object_id={object_id}, from_state={old_state}, status={status:?}"
);
return Ok((status, latest_state.to_string()));
}
}
info!("State cache miss for object_id={object_id}, from_state={old_state}");
let mut current_state = old_state.to_string();
loop {
let changes_response = client
.changes::<Calendar>(¤t_state, Some(500))
.await
.map_err(|err| Error::new(ErrorKind::Io, err))?;
let ChangesResponse {
new_state,
has_more_changes,
created,
updated,
destroyed,
..
} = changes_response;
let mut changes = StateChanges::new();
changes.created.extend(created);
changes.updated.extend(updated);
changes.destroyed.extend(destroyed);
cache.add_transition(current_state.clone(), new_state.clone(), changes);
if !has_more_changes {
let status = cache
.query_changes(old_state, object_id)
.expect("Cache should have the status after populating from changes");
return Ok((status, new_state));
}
current_state = new_state;
}
}
}
#[async_trait]
impl<C> Storage for JmapStorage<C>
where
C: Service<Request<String>, Response = Response<Incoming>> + Send + Sync + 'static,
C::Error: std::error::Error + Send + Sync,
C::Future: Send + Sync,
{
fn item_kind(&self) -> ItemKind {
self.item_kind
}
async fn check(&self) -> Result<()> {
let (client, _cache) = &*self.inner.lock().await;
let session = client.get_session_resource().await.map_err(|e| {
Error::new(
ErrorKind::Unavailable,
format!("Failed to fetch JMAP session: {e}"),
)
})?;
if !session.supports_calendars() {
return Err(Error::new(
ErrorKind::Unsupported,
"JMAP server does not support calendars",
));
}
Ok(())
}
async fn discover_collections(&self) -> Result<Discovery> {
let (client, _cache) = &*self.inner.lock().await;
let calendars = client
.get_collections::<Calendar>()
.await
.map_err(|err| Error::new(ErrorKind::Io, err))?; debug!("Discovered {} collections.", calendars.len());
let collections = calendars
.into_iter()
.map(|ab| {
let id = CollectionId::from_str(&ab.id).map_err(|e| {
Error::new(
ErrorKind::InvalidInput, format!("Invalid collection id '{}': {}", ab.id, e),
)
})?;
Ok(DiscoveredCollection::new(ab.id.clone(), id))
})
.collect::<Result<Vec<_>>>()?;
Discovery::try_from(collections).map_err(|e| Error::new(ErrorKind::InvalidData, e))
}
async fn create_collection(&self, _href: &str) -> Result<Collection> {
Err(Error::new(
ErrorKind::Unsupported,
"JMAP does not support creating collections with specified hrefs",
))
}
async fn delete_collection(&self, href: &str) -> Result<()> {
let items = self.list_items(href).await?;
if !items.is_empty() {
return Err(ErrorKind::CollectionNotEmpty.into()); }
let (client, _cache) = &*self.inner.lock().await;
client
.delete_collection::<Calendar>(href, None)
.await
.map_err(|err| Error::new(ErrorKind::Io, err))?;
Ok(())
}
async fn list_items(&self, collection_href: &str) -> Result<Vec<ItemVersion>> {
let (client, _cache) = &*self.inner.lock().await;
let records = client
.get_records::<Calendar>(collection_href)
.await
.map_err(|err| Error::new(ErrorKind::Io, err))?;
Ok(records
.into_iter()
.map(|record| {
ItemVersion::new(
href_for_item(collection_href, &record.id),
Etag::from(record.state),
)
})
.collect())
}
async fn changed_since(
&self,
collection: &str,
since_state: Option<&str>,
) -> Result<CollectionChanges> {
let (client, cache) = &mut *self.inner.lock().await;
match since_state {
None => {
let records = client
.get_records::<Calendar>(collection)
.await
.map_err(|err| Error::new(ErrorKind::Io, err))?;
let new_state = records
.first()
.map(|r| Some(r.state.clone()))
.unwrap_or_default();
Ok(CollectionChanges {
new_state,
changed: records
.into_iter()
.map(|r| href_for_item(collection, &r.id))
.collect(),
deleted: Vec::new(),
})
}
Some(since_state) => {
let mut current_state = since_state.to_string();
let mut all_changed = Vec::new();
let mut all_deleted = Vec::new();
let new_state = loop {
let changes_response = client
.changes::<Calendar>(¤t_state, None)
.await
.map_err(|err| Error::new(ErrorKind::Io, err))?;
let ChangesResponse {
new_state,
has_more_changes,
created,
updated,
destroyed,
..
} = changes_response;
let mut changes = StateChanges::new();
changes.created.extend(created.iter().cloned());
changes.updated.extend(updated.iter().cloned());
changes.destroyed.extend(destroyed.iter().cloned());
cache.add_transition(current_state.clone(), new_state.clone(), changes);
all_changed.extend(created);
all_changed.extend(updated);
all_deleted.extend(destroyed);
if !has_more_changes {
break Some(new_state);
}
current_state = new_state;
};
Ok(CollectionChanges {
new_state,
changed: all_changed
.into_iter()
.map(|id| href_for_item(collection, &id))
.collect(),
deleted: all_deleted
.into_iter()
.map(|id| href_for_item(collection, &id))
.collect(),
})
}
}
}
async fn get_item(&self, href: &str) -> Result<(Item, Etag)> {
let (collection_id, object_id) = parse_item_href(href)?;
trace!("Getting JMAP records for collection {collection_id}.");
let (client, _cache) = &*self.inner.lock().await;
let records = client
.get_records::<Calendar>(collection_id)
.await
.map_err(|err| Error::new(ErrorKind::Io, err))?;
let record = records
.into_iter()
.find(|r| r.id == object_id)
.ok_or_else(|| Error::new(ErrorKind::DoesNotExist, "Item not found"))?;
let item = jscalendar_to_icalendar(&record.data)?;
Ok((item, Etag::from(record.state)))
}
async fn get_many_items(&self, hrefs: &[&str]) -> Result<Vec<FetchedItem>> {
if hrefs.is_empty() {
return Ok(Vec::new());
}
let mut collections: HashMap<&str, Vec<&str>> = HashMap::new();
for href in hrefs {
let (collection_id, object_id) = parse_item_href(href)?;
collections
.entry(collection_id)
.or_default()
.push(object_id);
}
let mut fetched_items = Vec::new();
let (client, _cache) = &*self.inner.lock().await;
for (collection_id, object_ids) in collections {
let records = client
.get_records::<Calendar>(collection_id)
.await
.map_err(|err| Error::new(ErrorKind::Io, err))?;
for record in records {
if object_ids.contains(&record.id.as_str()) {
let item = jscalendar_to_icalendar(&record.data)?;
fetched_items.push(FetchedItem {
href: href_for_item(collection_id, &record.id),
item,
etag: Etag::from(record.state),
});
}
}
}
Ok(fetched_items)
}
async fn create_item(
&self,
collection: &str,
item: &Item,
_opts: CreateItemOptions,
) -> Result<ItemVersion> {
let (client, cache) = &mut *self.inner.lock().await;
let old_state = cache.latest_state().map(String::from);
let json_jscalendar = icalendar_to_jscalendar(item.as_str())?;
let record = client
.create_record::<Calendar>(collection, &json_jscalendar, old_state.as_deref())
.await
.map_err(|err| Error::new(ErrorKind::Io, err))?; trace!(
"Created JSCalendar collection={}, id={}.",
collection, &record.id
);
if let Some(old_state) = old_state {
cache.add_transition(
old_state,
record.state.clone(),
StateChanges::created(record.id.clone()),
);
} else {
cache.add_transition(
record.state.clone(), record.state.clone(),
StateChanges::created(record.id.clone()),
);
}
Ok(ItemVersion::new(
href_for_item(collection, &record.id),
Etag::from(record.state),
))
}
async fn update_item(&self, href: &str, etag: &Etag, item: &Item) -> Result<Etag> {
let (collection_id, object_id) = parse_item_href(href)?;
let json_jscalendar = icalendar_to_jscalendar(item.as_str())?;
let (client, cache) = &mut *self.inner.lock().await;
let result = client
.update_record::<Calendar>(object_id, collection_id, &json_jscalendar, Some(&etag.0))
.await;
match result {
Ok(record) => {
cache.add_transition(
etag.0.clone(),
record.state.clone(),
StateChanges::updated(object_id.to_string()),
);
Ok(Etag::from(record.state))
}
Err(err) => {
if let libjmap::error::Error::ServerError { ref error_type, .. } = err {
if error_type == "stateMismatch" {
let (change_status, new_state) =
Self::cached_changed_since(client, cache, object_id, &etag.0).await?;
match change_status {
ChangeStatus::NotChanged => {
let record = client
.update_record::<Calendar>(
object_id,
collection_id,
&json_jscalendar,
Some(&new_state),
)
.await
.map_err(|err| Error::new(ErrorKind::Io, err))?;
cache.add_transition(
new_state,
record.state.clone(),
StateChanges::updated(object_id.to_string()),
);
return Ok(Etag::from(record.state));
}
ChangeStatus::Changed | ChangeStatus::Deleted => {
return Err(Error::new(
ErrorKind::InvalidData,
"Etag does not match; item has been modified",
));
}
}
}
}
Err(Error::new(ErrorKind::Io, err))
}
}
}
async fn delete_item(&self, href: &str, etag: &Etag) -> Result<()> {
let (_collection_id, object_id) = parse_item_href(href)?;
let (client, cache) = &mut *self.inner.lock().await;
let result = client
.delete_record::<Calendar>(object_id, Some(&etag.0))
.await;
match result {
Ok(new_state) => {
cache.add_transition(
etag.0.clone(),
new_state,
StateChanges::destroyed(object_id.to_string()),
);
Ok(())
}
Err(err) => {
if let libjmap::error::Error::ServerError { ref error_type, .. } = err {
if error_type == "stateMismatch" {
let (change_status, new_state) =
Self::cached_changed_since(client, cache, object_id, &etag.0).await?;
match change_status {
ChangeStatus::NotChanged => {
let final_state = client
.delete_record::<Calendar>(object_id, Some(&new_state))
.await
.map_err(|err| Error::new(ErrorKind::Io, err))?;
cache.add_transition(
new_state,
final_state,
StateChanges::destroyed(object_id.to_string()),
);
return Ok(());
}
ChangeStatus::Changed => {
return Err(Error::new(
ErrorKind::InvalidData,
"Etag does not match; item has been modified",
));
}
ChangeStatus::Deleted => {
return Ok(()); }
}
}
}
Err(Error::new(ErrorKind::Io, err))
}
}
}
async fn get_property(&self, href: &str, property: Property) -> Result<Option<String>> {
let (client, _cache) = &*self.inner.lock().await;
match property {
Property::Calendar(CalendarProperty::DisplayName) => {
let calendars = client
.get_collections::<Calendar>()
.await
.map_err(|err| Error::new(ErrorKind::Io, err))?;
Ok(calendars
.into_iter()
.find(|cal| cal.id == href)
.map(|cal| cal.name))
}
Property::Calendar(CalendarProperty::Description) => {
let calendars = client
.get_collections::<Calendar>()
.await
.map_err(|err| Error::new(ErrorKind::Io, err))?;
Ok(calendars
.into_iter()
.find(|cal| cal.id == href)
.and_then(|cal| cal.description))
}
Property::Calendar(CalendarProperty::Colour) => {
let calendars = client
.get_collections::<Calendar>()
.await
.map_err(|err| Error::new(ErrorKind::Io, err))?;
Ok(calendars
.into_iter()
.find(|cal| cal.id == href)
.and_then(|cal| cal.color))
}
Property::Calendar(CalendarProperty::Order) => {
let calendars = client
.get_collections::<Calendar>()
.await
.map_err(|err| Error::new(ErrorKind::Io, err))?;
Ok(calendars
.into_iter()
.find(|cal| cal.id == href)
.map(|cal| cal.sort_order.to_string()))
}
Property::AddressBook(_) => Err(Error::new(
ErrorKind::InvalidInput,
"AddressBook properties not supported by Calendar storage",
)),
}
}
async fn set_property(&self, _href: &str, _property: Property, _value: &str) -> Result<()> {
Err(Error::new(ErrorKind::Unsupported, "Not yet implemented"))
}
async fn unset_property(&self, _href: &str, _property: Property) -> Result<()> {
Err(Error::new(ErrorKind::Unsupported, "Not yet implemented"))
}
fn href_for_collection_id(&self, id: &CollectionId) -> Result<Href> {
Ok(id.to_string())
}
}