use alloc::{string::String, sync::Arc, vec::Vec};
use core::sync::atomic::AtomicBool;
use std::{
io::{self, ErrorKind, Read, Write},
sync::mpsc::Sender,
};
use io_jmap::{
client::{JmapClientStd as InnerJmapClientStd, JmapClientStdError as InnerJmapClientStdError},
coroutine::*,
rfc8620::{JmapMethodError, JmapSession, changes::JmapChangesError},
rfc8621::{
email::{changes::JmapEmailChangesOptions, get::JmapEmailGetOptions},
mailbox::{
changes::{JmapMailboxChangesError, JmapMailboxChangesOptions},
get::JmapMailboxGetOptions,
},
},
};
#[cfg(any(
feature = "rustls-ring",
feature = "rustls-aws",
feature = "native-tls"
))]
use pimalaya_stream::tls::Tls;
use secrecy::SecretString;
use thiserror::Error;
use url::Url;
#[cfg(feature = "search")]
use crate::{
envelope::jmap::search::{JmapEnvelopeSearch, JmapEnvelopeSearchError},
search::query::SearchEmailsQuery,
};
use crate::{
envelope::{
event::WatchEvent,
jmap::{
diff as envelope_diff,
list::{JmapEnvelopeList, JmapEnvelopeListError},
watch::{JmapWatchMailbox, JmapWatchMailboxError, JmapWatchMailboxYield},
},
types::{Envelope, EnvelopeDiff, FlagUpdate},
},
flag::{
jmap::store::{JmapFlagStore, JmapFlagStoreError},
types::{Flag, FlagOp},
},
jmap::convert::{envelope_from, envelope_properties},
mailbox::{
jmap::{
create::{JmapMailboxCreate, JmapMailboxCreateError},
delete::{JmapMailboxDelete, JmapMailboxDeleteError},
list::{JmapMailboxList, JmapMailboxListError},
},
types::{Mailbox, MailboxDiff},
},
message::jmap::{
add::{JmapMessageAdd, JmapMessageAddError},
copy::{JmapMessageCopy, JmapMessageCopyError},
delete::{JmapMessageDelete, JmapMessageDeleteError},
get::{JmapMessageGet, JmapMessageGetError},
r#move::{JmapMessageMove, JmapMessageMoveError},
send::{JmapMessageSend, JmapMessageSendError},
},
};
#[derive(Debug, Error)]
pub enum JmapClientError {
#[error(transparent)]
Io(#[from] io::Error),
#[error("JMAP session is not initialised; call connect or session_get first")]
MissingSession,
#[error(transparent)]
MailboxList(#[from] JmapMailboxListError),
#[error(transparent)]
EnvelopeList(#[from] JmapEnvelopeListError),
#[cfg(feature = "search")]
#[error(transparent)]
EnvelopeSearch(#[from] JmapEnvelopeSearchError),
#[error(transparent)]
FlagStore(#[from] JmapFlagStoreError),
#[error(transparent)]
MailboxCreate(#[from] JmapMailboxCreateError),
#[error(transparent)]
MailboxDelete(#[from] JmapMailboxDeleteError),
#[error(transparent)]
MessageAdd(#[from] JmapMessageAddError),
#[error(transparent)]
MessageCopy(#[from] JmapMessageCopyError),
#[error(transparent)]
MessageDelete(#[from] JmapMessageDeleteError),
#[error(transparent)]
MessageGet(#[from] JmapMessageGetError),
#[error(transparent)]
MessageMove(#[from] JmapMessageMoveError),
#[error(transparent)]
MessageSend(#[from] JmapMessageSendError),
#[error(transparent)]
WatchMailbox(#[from] JmapWatchMailboxError),
#[error(transparent)]
Inner(#[from] InnerJmapClientStdError),
}
const READ_BUFFER_SIZE: usize = 16 * 1024;
pub struct JmapClientStd {
pub inner: InnerJmapClientStd,
pub identity_id: Option<String>,
pub drafts_mailbox_id: Option<String>,
}
impl JmapClientStd {
pub fn new<S: Read + Write + Send + 'static>(stream: S, http_auth: SecretString) -> Self {
Self {
inner: InnerJmapClientStd::new(stream, http_auth),
identity_id: None,
drafts_mailbox_id: None,
}
}
pub fn run<C, T, E>(&mut self, mut coroutine: C) -> Result<T, JmapClientError>
where
C: JmapCoroutine<Yield = JmapYield, Return = Result<T, E>>,
JmapClientError: From<E>,
{
let mut buf = [0u8; READ_BUFFER_SIZE];
let mut arg: Option<&[u8]> = None;
loop {
match coroutine.resume(arg.take()) {
JmapCoroutineState::Complete(Ok(out)) => return Ok(out),
JmapCoroutineState::Complete(Err(err)) => return Err(err.into()),
JmapCoroutineState::Yielded(JmapYield::WantsRead) => {
let n = self.inner.stream.read(&mut buf)?;
arg = Some(&buf[..n]);
}
JmapCoroutineState::Yielded(JmapYield::WantsWrite(bytes)) => {
self.inner.stream.write_all(&bytes)?;
}
}
}
}
pub fn session_get(&mut self, url: &Url) -> Result<(), JmapClientError> {
self.inner.session_get(url)?;
Ok(())
}
pub fn list_mailboxes(&mut self, with_counts: bool) -> Result<Vec<Mailbox>, JmapClientError> {
let coroutine = {
let session = self.session_or_err()?;
let http_auth = &self.inner.http_auth;
JmapMailboxList::new(session, http_auth, with_counts)?
};
self.run(coroutine)
}
pub fn list_envelopes(
&mut self,
mailbox: &str,
page: Option<u32>,
page_size: Option<u32>,
) -> Result<Vec<Envelope>, JmapClientError> {
let coroutine = {
let session = self.session_or_err()?;
let http_auth = &self.inner.http_auth;
JmapEnvelopeList::new(session, http_auth, mailbox, page, page_size)?
};
self.run(coroutine)
}
#[cfg(feature = "search")]
pub fn search_envelopes(
&mut self,
mailbox: &str,
query: Option<&SearchEmailsQuery>,
page: Option<u32>,
page_size: Option<u32>,
) -> Result<Vec<Envelope>, JmapClientError> {
let coroutine = {
let session = self.session_or_err()?;
let http_auth = &self.inner.http_auth;
JmapEnvelopeSearch::new(session, http_auth, mailbox, query, page, page_size)?
};
self.run(coroutine)
}
pub fn store_flags(
&mut self,
mailbox: &str,
ids: &[&str],
flags: &[Flag],
op: FlagOp,
) -> Result<(), JmapClientError> {
let coroutine = {
let session = self.session_or_err()?;
let http_auth = &self.inner.http_auth;
JmapFlagStore::new(session, http_auth, mailbox, ids, flags, op)?
};
self.run(coroutine)
}
pub fn get_message(&mut self, mailbox: &str, id: &str) -> Result<Vec<u8>, JmapClientError> {
let coroutine = {
let session = self.session_or_err()?;
let http_auth = &self.inner.http_auth;
JmapMessageGet::new(session, http_auth, mailbox, id)?
};
self.run(coroutine)
}
pub fn add_message(
&mut self,
mailbox: &str,
flags: &[Flag],
raw: Vec<u8>,
) -> Result<String, JmapClientError> {
let coroutine = {
let session = self.session_or_err()?;
let http_auth = &self.inner.http_auth;
JmapMessageAdd::new(session, http_auth, mailbox, flags, raw)?
};
self.run(coroutine)
}
pub fn create_mailbox(&mut self, name: &str) -> Result<(), JmapClientError> {
let coroutine = {
let session = self.session_or_err()?;
let http_auth = &self.inner.http_auth;
JmapMailboxCreate::new(session, http_auth, name)?
};
self.run(coroutine)
}
pub fn delete_mailbox(&mut self, name: &str) -> Result<(), JmapClientError> {
let coroutine = {
let session = self.session_or_err()?;
let http_auth = &self.inner.http_auth;
JmapMailboxDelete::new(session, http_auth, name)?
};
self.run(coroutine)
}
pub fn delete_message(&mut self, mailbox: &str, id: &str) -> Result<(), JmapClientError> {
let coroutine = {
let session = self.session_or_err()?;
let http_auth = &self.inner.http_auth;
JmapMessageDelete::new(session, http_auth, mailbox, id)?
};
self.run(coroutine)
}
pub fn copy_messages(
&mut self,
from: &str,
to: &str,
ids: &[&str],
) -> Result<(), JmapClientError> {
let coroutine = {
let session = self.session_or_err()?;
let http_auth = &self.inner.http_auth;
JmapMessageCopy::new(session, http_auth, from, to, ids)?
};
self.run(coroutine)
}
pub fn move_messages(
&mut self,
from: &str,
to: &str,
ids: &[&str],
) -> Result<(), JmapClientError> {
let coroutine = {
let session = self.session_or_err()?;
let http_auth = &self.inner.http_auth;
JmapMessageMove::new(session, http_auth, from, to, ids)?
};
self.run(coroutine)
}
pub fn send_message(&mut self, raw: Vec<u8>) -> Result<(), JmapClientError> {
let coroutine = {
let session = self.session_or_err()?;
let http_auth = &self.inner.http_auth;
let identity_id = self
.identity_id
.as_deref()
.ok_or(JmapClientError::MissingSession)?;
let drafts_id = self
.drafts_mailbox_id
.as_deref()
.ok_or(JmapClientError::MissingSession)?;
JmapMessageSend::new(session, http_auth, identity_id, drafts_id, raw)?
};
self.run(coroutine)
}
pub fn watch_mailbox(
&mut self,
mailbox: &str,
shutdown: Arc<AtomicBool>,
tx: Sender<WatchEvent>,
) -> Result<(), JmapClientError> {
let mut coroutine = {
let session = self.session_or_err()?;
let http_auth = &self.inner.http_auth;
JmapWatchMailbox::new(session, http_auth, mailbox, shutdown)?
};
let mut buf = [0u8; READ_BUFFER_SIZE];
let mut bytes: Option<&[u8]> = None;
loop {
match coroutine.resume(bytes) {
JmapCoroutineState::Complete(result) => return Ok(result?),
JmapCoroutineState::Yielded(JmapWatchMailboxYield::WantsRead) => {
match self.inner.stream.read(&mut buf) {
Ok(n) => bytes = Some(&buf[..n]),
Err(err) if err.kind() == ErrorKind::WouldBlock => bytes = None,
Err(err) if err.kind() == ErrorKind::TimedOut => bytes = None,
Err(err) => return Err(err.into()),
}
}
JmapCoroutineState::Yielded(JmapWatchMailboxYield::WantsWrite(out)) => {
self.inner.stream.write_all(&out)?;
bytes = None;
}
JmapCoroutineState::Yielded(JmapWatchMailboxYield::Event(evt)) => {
if tx.send(evt).is_err() {
return Ok(());
}
bytes = None;
}
}
}
}
pub fn diff_envelopes(
&mut self,
_mailbox: &str,
state: Option<&[u8]>,
) -> Result<EnvelopeDiff, JmapClientError> {
let Some(since_state) = state.and_then(envelope_diff::decode) else {
return self.diff_baseline();
};
let mut created_ids: Vec<String> = Vec::new();
let mut updated_ids: Vec<String> = Vec::new();
let mut destroyed_ids: Vec<String> = Vec::new();
let mut cursor = since_state;
loop {
let changes = match self
.inner
.email_changes(cursor.clone(), JmapEmailChangesOptions::default())
{
Ok(c) => c,
Err(err) if envelope_diff::is_cannot_calculate_changes(&err) => {
return self.diff_baseline();
}
Err(err) => return Err(err.into()),
};
created_ids.extend(changes.created);
updated_ids.extend(changes.updated);
destroyed_ids.extend(changes.destroyed);
cursor = changes.new_state;
if !changes.has_more_changes {
break;
}
}
let properties = envelope_properties();
let new_envelopes = if created_ids.is_empty() {
Vec::new()
} else {
let opts = JmapEmailGetOptions {
properties: Some(properties.clone()),
..Default::default()
};
let output = self.inner.email_get(created_ids, opts)?;
output.emails.into_iter().map(envelope_from).collect()
};
let flag_updates = if updated_ids.is_empty() {
Vec::new()
} else {
let opts = JmapEmailGetOptions {
properties: Some(properties),
..Default::default()
};
let output = self.inner.email_get(updated_ids, opts)?;
output
.emails
.into_iter()
.map(envelope_from)
.map(|env| FlagUpdate {
id: env.id,
flags: env.flags,
})
.collect()
};
Ok(EnvelopeDiff::Incremental {
new_state: envelope_diff::encode(&cursor),
flag_updates,
new_envelopes,
vanished_ids: destroyed_ids,
})
}
pub fn diff_mailboxes(&mut self, state: Option<&[u8]>) -> Result<MailboxDiff, JmapClientError> {
let Some(since_state) = state.and_then(envelope_diff::decode) else {
let opts = JmapMailboxGetOptions {
ids: Some(Vec::new()),
..Default::default()
};
let output = self.inner.mailbox_get(opts)?;
return Ok(MailboxDiff::Changed {
new_state: Some(envelope_diff::encode(&output.new_state)),
});
};
match self
.inner
.mailbox_changes(since_state, JmapMailboxChangesOptions::default())
{
Ok(changes)
if !changes.has_more_changes
&& changes.created.is_empty()
&& changes.updated.is_empty()
&& changes.destroyed.is_empty() =>
{
Ok(MailboxDiff::Unchanged {
new_state: envelope_diff::encode(&changes.new_state),
})
}
Ok(changes) => Ok(MailboxDiff::Changed {
new_state: Some(envelope_diff::encode(&changes.new_state)),
}),
Err(InnerJmapClientStdError::MailboxChanges(JmapMailboxChangesError::Changes(
JmapChangesError::Method(JmapMethodError::CannotCalculateChanges { .. }),
))) => Ok(MailboxDiff::Changed { new_state: None }),
Err(err) => Err(err.into()),
}
}
fn diff_baseline(&mut self) -> Result<EnvelopeDiff, JmapClientError> {
let output = self
.inner
.email_get(Vec::new(), JmapEmailGetOptions::default())?;
Ok(EnvelopeDiff::FullListRequired {
new_state: Some(envelope_diff::encode(&output.new_state)),
})
}
fn session_or_err(&self) -> Result<&JmapSession, JmapClientError> {
self.inner
.session
.as_ref()
.ok_or(JmapClientError::MissingSession)
}
}
#[cfg(any(
feature = "rustls-ring",
feature = "rustls-aws",
feature = "native-tls"
))]
impl JmapClientStd {
pub fn connect(url: &Url, tls: &Tls, http_auth: SecretString) -> Result<Self, JmapClientError> {
let mut inner = InnerJmapClientStd::connect(url, tls, http_auth)?;
inner.session_get(url)?;
Ok(Self {
inner,
identity_id: None,
drafts_mailbox_id: None,
})
}
}