use imap_proto::types::UidSetMember;
use log::{error, info, trace, warn};
use maildir::Maildir;
use std::collections::BTreeSet;
use std::fs;
use std::io;
use crate::flags;
use crate::mailboxes;
use crate::seqset::SeqSet;
use crate::state;
use crate::SyncError;
fn imap_delete_uids<T: io::Read + io::Write>(
session: &mut imap::Session<T>,
uids: &SeqSet,
) -> Result<Option<u64>, SyncError> {
trace!("deleting remote UIDs {}", uids);
session.uid_store(format!("{}", uids), "+FLAGS.SILENT (\\Deleted)")?;
let d = session.uid_expunge(format!("{}", uids))?;
if d.mod_seq.is_none() {
warn!("EXPUNGE response without MODSEQ");
}
Ok(d.mod_seq)
}
fn imap_apply_local_changes<T: io::Read + io::Write>(
session: &mut imap::Session<T>,
mailbox: &String,
state: &mut state::SyncState,
maildir: &maildir::Maildir,
) -> Result<(), SyncError> {
trace!("pushing local changes (if any)");
while let Some((uid, local_modification)) = state.local_changes.modified.pop_first() {
let imap_flags =
flags::to_imap_list(&flags::maildir_to_imap(local_modification.new_flags()));
trace!("updating remote flags for UID {}", uid);
let store = session.uid_store(
format!("{}", uid),
format!(
"(UNCHANGEDSINCE {}) FLAGS ({})",
state.last_seen.highest_mod_seq, imap_flags
),
)?;
for s in store.iter() {
if let Some(modseq) = s.mod_seq() {
state.last_seen.highest_mod_seq =
std::cmp::max(state.last_seen.highest_mod_seq, modseq);
} else {
warn!("{}: store response without MODSEQ", mailbox);
}
}
state.apply(uid, state::LocalChange::Modification(local_modification));
}
if !state.local_changes.deleted.is_empty() {
let set = SeqSet::from_iter(state.local_changes.deleted.keys().cloned());
if let Some(mod_seq) = imap_delete_uids(session, &set)? {
state.last_seen.highest_mod_seq = mod_seq;
}
while let Some((uid, local_deletion)) = state.local_changes.deleted.pop_first() {
state.apply(uid, state::LocalChange::Deletion(local_deletion));
}
}
while let Some(local_addition) = state.local_changes.added.pop() {
trace!("uploading local mail {}", local_addition.id());
let mail = maildir
.find(local_addition.id())
.expect("new mail not found on disk, inconsistent state, giving up");
let data = fs::read(mail.path())?;
let flags = flags::maildir_to_imap(mail.flags());
let a = session.append(mailbox, &data).flags(flags).finish()?;
match a.uids {
Some(uids) => {
for set in uids {
match set {
UidSetMember::UidRange(_) => {
warn!("Unexpected UID range in APPEND response");
}
UidSetMember::Uid(uid) => {
state.apply(uid, state::LocalChange::Addition(local_addition));
break;
}
}
}
}
None => {
return Err(SyncError::Error("APPEND returned no UIDs"));
}
}
let mut need_mod_seq = true;
for unsol in session.unsolicited_responses.try_iter() {
match unsol {
imap::types::UnsolicitedResponse::Ok { code, .. } => {
if let Some(imap_proto::types::ResponseCode::HighestModSeq(seq)) = code {
state.last_seen.highest_mod_seq = seq;
need_mod_seq = false;
}
}
imap::types::UnsolicitedResponse::Exists(_)
| imap::types::UnsolicitedResponse::Recent(_) => (),
_ => trace!("{}: unhandled unsolicited response: {:?}", mailbox, unsol),
}
}
if need_mod_seq {
trace!(
"{}: sending final EXAMINE for highest mod sequence",
mailbox
);
let e = session.examine(mailbox)?;
match e.highest_mod_seq {
Some(mod_seq) => state.last_seen.highest_mod_seq = mod_seq,
None => warn!("Failed to get highest mod sequence after uploads!"),
}
}
}
Ok(())
}
pub(crate) fn pull<T: io::Read + io::Write>(
session: &mut imap::Session<T>,
sync_job: &mailboxes::SyncJob,
maildir: Maildir,
remote: imap::types::Mailbox,
mut state: state::SyncState,
) -> Result<(), SyncError> {
let mailbox = &sync_job.name;
trace!("Discarding local state during pull");
let mut refresh_needed = state.discard_local_changes()?;
let new_uid_validity = remote.uid_validity.ok_or(SyncError::Error(
"Server is incompatible: no UIDVALIDITY provided",
))?;
let new_highest_mod_seq: u64 = remote.highest_mod_seq.unwrap_or(0);
if new_highest_mod_seq == 0 {
refresh_needed = true;
warn!("No highest_mod_seq for mailbox {}", mailbox);
}
if state.last_seen.highest_mod_seq == 0 {
refresh_needed = true;
}
if new_highest_mod_seq == state.last_seen.highest_mod_seq
&& new_uid_validity == state.last_seen.uid_validity
&& !refresh_needed
{
trace!("Nothing to do for {}", mailbox);
state.save()?;
return Ok(());
}
if new_uid_validity != state.last_seen.uid_validity
&& !matches!(sync_job.action, mailboxes::SyncAction::CreateLocal)
{
info!("UID validity change in {} forces full resync", mailbox);
state.clear();
state.save()?;
state = state::SyncState::load(&maildir.path())?;
state.discard_local_changes()?;
state.last_seen.highest_mod_seq = 0;
}
trace!("In {}: {:?}", mailbox, state.local_changes);
let fetch = if state.is_empty() {
trace!("Performing full fetch for {}", mailbox);
session.uid_fetch("1:*", "(FLAGS UID BODY.PEEK[])")?
} else {
let f = if refresh_needed {
trace!("Performing refresh fetch for {}", mailbox);
session.uid_fetch("1:*", "(FLAGS UID)")?
} else {
trace!(
"Performing refresh fetch for {} ({} -> {})",
mailbox,
state.last_seen.highest_mod_seq,
new_highest_mod_seq
);
session.uid_fetch(
"1:*",
format!(
"(FLAGS UID) (CHANGEDSINCE {} VANISHED)",
state.last_seen.highest_mod_seq
),
)?
};
let mut to_fetch = SeqSet::new();
let mut all_remote_uids: BTreeSet<u32> = BTreeSet::new();
let all_local_uids: BTreeSet<u32> = state.uids();
for mail in f.iter() {
let uid = mail
.uid
.ok_or(SyncError::Error("Server did not send UID, giving up"))?;
if refresh_needed {
all_remote_uids.insert(uid);
}
match &state.get(&uid) {
Some((id, local_flags)) => {
let id = id.clone();
let remote_flags = flags::imap_to_maildir(mail.flags());
if local_flags != &remote_flags {
state.safe_update(uid, &id, &remote_flags)?;
}
if maildir.set_flags(&id, &remote_flags).is_err() {
if maildir
.move_new_to_cur_with_flags(&id, &remote_flags)
.is_err()
{
return Err(SyncError::E(format!(
"Failed to set flags for {} - inconsistent state?",
id
)));
}
}
}
None => {
to_fetch.insert(uid);
}
}
}
let mut delete_local = |uid: &u32| -> Result<(), SyncError> {
trace!("{}: mail deleted on remote side: UID {}", mailbox, uid);
if let Ok(Some(local)) = state.safe_delete(uid) {
maildir.delete(&local).map_err(|_| {
SyncError::Error("Error deleting mail locally - inconsistent state?")
})?;
}
Ok(())
};
for unsol in session.unsolicited_responses.try_iter() {
if let imap::types::UnsolicitedResponse::Vanished { earlier: _, uids } = unsol {
for uid in uids.into_iter().flatten() {
delete_local(&uid)?;
}
} else {
trace!("{}: unhandled unsolicited response: {:?}", mailbox, unsol);
}
}
if refresh_needed {
let deleted = all_local_uids.difference(&all_remote_uids);
for uid in deleted {
delete_local(uid)?;
}
}
session.uid_fetch(format!("{}", to_fetch), "(FLAGS UID BODY.PEEK[])")?
};
for mail in fetch.iter() {
let body = mail
.body()
.ok_or(SyncError::Error("downloaded message without content"))?;
let uid = mail
.uid
.ok_or(SyncError::Error("Server did not send UID, giving up"))?;
if uid > state.last_seen.uid {
state.last_seen.uid = uid;
}
let recent = (mail.flags().is_empty()) || (mail.flags()[0] == imap::types::Flag::Recent);
if !recent {
let flags = flags::imap_to_maildir(mail.flags());
match maildir.store_cur_with_flags(body, &flags) {
Ok(newid) => {
state.insert(uid, (newid, flags));
}
Err(e) => {
error!("{}: error saving new message: {}", mailbox, e);
state.save()?;
return Err(SyncError::MaildirError(e));
}
};
} else {
match maildir.store_new(body) {
Ok(newid) => {
state.insert(uid, (newid, String::from("")));
}
Err(e) => {
error!("{}: error saving new message: {}", mailbox, e);
state.save()?;
return Err(SyncError::MaildirError(e));
}
};
}
}
state.last_seen.highest_mod_seq = new_highest_mod_seq;
state.last_seen.uid_validity = new_uid_validity;
state.save()?;
Ok(())
}
pub(crate) fn push<T: io::Read + io::Write>(
session: &mut imap::Session<T>,
sync_job: &mailboxes::SyncJob,
maildir: Maildir,
remote: imap::types::Mailbox,
mut state: state::SyncState,
) -> Result<(), SyncError> {
let mailbox = &sync_job.name;
let mut refresh_needed = false;
let new_uid_validity = remote.uid_validity.ok_or(SyncError::Error(
"Server is incompatible: no UIDVALIDITY provided",
))?;
let new_highest_mod_seq: u64 = remote.highest_mod_seq.unwrap_or(0);
if new_highest_mod_seq == 0 {
refresh_needed = true;
warn!("No highest_mod_seq for mailbox {}", mailbox);
}
if state.last_seen.highest_mod_seq == 0 {
refresh_needed = true;
}
if new_highest_mod_seq == state.last_seen.highest_mod_seq
&& new_uid_validity == state.last_seen.uid_validity
&& !refresh_needed
&& !state.has_local_changes()
{
trace!(
"Nothing to do for {} ({}/{})",
mailbox,
new_uid_validity,
new_highest_mod_seq
);
state.save()?;
return Ok(());
}
if new_uid_validity != state.last_seen.uid_validity {
info!("UID validity change in {} forces full resync", mailbox);
state.clear();
state.save()?;
state = state::SyncState::load(&maildir.path())?;
state.last_seen.highest_mod_seq = 0;
refresh_needed = true;
if matches!(sync_job.action, mailboxes::SyncAction::CreateRemote) {
state.last_seen.uid_validity = new_uid_validity;
}
}
trace!("In {}: {:?}", mailbox, state.local_changes);
let deletion_seqset = if state.last_seen.uid_validity != new_uid_validity {
trace!("Clearing entire mailbox {}", mailbox);
SeqSet::from([1, u32::MAX])
} else {
let f = if refresh_needed {
trace!("Performing refresh fetch for {}", mailbox);
session.uid_fetch("1:*", "(FLAGS UID)")?
} else {
trace!(
"Performing refresh fetch for {} ({} -> {})",
mailbox,
state.last_seen.highest_mod_seq,
new_highest_mod_seq
);
session.uid_fetch(
"1:*",
format!(
"(FLAGS UID) (CHANGEDSINCE {} VANISHED)",
state.last_seen.highest_mod_seq
),
)?
};
let mut all_remote_uids: BTreeSet<u32> = BTreeSet::new();
let all_local_uids: BTreeSet<u32> = state.uids();
let mut to_delete = SeqSet::new();
for mail in f.iter() {
let uid = mail
.uid
.ok_or(SyncError::Error("Server did not send UID, giving up"))?;
if refresh_needed {
all_remote_uids.insert(uid);
}
match &state.get(&uid) {
Some((_, local_flags)) => {
let remote_flags = flags::imap_to_maildir(mail.flags());
if local_flags != &remote_flags {
let store = session.uid_store(
format!("{}", uid),
format!(
"(UNCHANGEDSINCE {}) FLAGS ({})",
new_highest_mod_seq, local_flags
),
)?;
for s in store.iter() {
if let Some(modseq) = s.mod_seq() {
state.last_seen.highest_mod_seq =
std::cmp::max(state.last_seen.highest_mod_seq, modseq);
} else {
warn!("store response without MODSEQ");
}
}
}
}
None => {
to_delete.insert(uid);
}
}
}
let restore_mail = |uid: &u32, state: &mut state::SyncState| -> Result<(), SyncError> {
trace!(
"{}: restoring mail deleted on remote side: UID {}",
mailbox,
uid
);
if let Some((id, flags)) = state.remove(uid) {
state
.local_changes
.added
.push(state::LocalAddition::new(id, flags));
}
Ok(())
};
for unsol in session.unsolicited_responses.try_iter() {
if let imap::types::UnsolicitedResponse::Vanished { earlier: _, uids } = unsol {
for uid in uids.into_iter().flatten() {
restore_mail(&uid, &mut state)?;
}
} else {
trace!("{}: unhandled unsolicited response: {:?}", mailbox, unsol);
}
}
if refresh_needed {
let deleted = all_local_uids.difference(&all_remote_uids);
for uid in deleted {
restore_mail(uid, &mut state)?;
}
};
to_delete
};
if !deletion_seqset.is_empty() {
if let Some(mod_seq) = imap_delete_uids(session, &deletion_seqset)? {
state.last_seen.highest_mod_seq = mod_seq;
}
}
state.last_seen.highest_mod_seq = new_highest_mod_seq;
state.last_seen.uid_validity = new_uid_validity;
imap_apply_local_changes(session, mailbox, &mut state, &maildir)?;
state.save()?;
Ok(())
}
pub(crate) fn sync<T: io::Read + io::Write>(
session: &mut imap::Session<T>,
sync_job: &mailboxes::SyncJob,
maildir: Maildir,
remote: imap::types::Mailbox,
mut state: state::SyncState,
) -> Result<(), SyncError> {
let mailbox = &sync_job.name;
let mut refresh_needed = false;
let new_uid_validity = remote.uid_validity.ok_or(SyncError::Error(
"Server is incompatible: no UIDVALIDITY provided",
))?;
let new_highest_mod_seq: u64 = remote.highest_mod_seq.unwrap_or(0);
if new_highest_mod_seq == 0 {
refresh_needed = true;
warn!("No highest_mod_seq for mailbox {}", mailbox);
}
if state.last_seen.highest_mod_seq == 0 {
refresh_needed = true;
}
if new_highest_mod_seq == state.last_seen.highest_mod_seq
&& new_uid_validity == state.last_seen.uid_validity
&& !refresh_needed
&& !state.has_local_changes()
{
trace!("Nothing to do for {}", mailbox);
state.save()?;
return Ok(());
}
if new_uid_validity != state.last_seen.uid_validity {
if matches!(sync_job.action, mailboxes::SyncAction::CreateRemote) {
state.last_seen.uid_validity = new_uid_validity;
} else {
if state.has_local_changes() {
let msg = format!("UID validity change in {} with pending local changes makes resynchronization impossible", mailbox);
error!("{}", &msg);
return Err(SyncError::E(msg));
}
info!("UID validity change in {} forces full resync", mailbox);
state.clear();
state.save()?;
state = state::SyncState::load(&maildir.path())?;
state.discard_local_changes()?;
state.last_seen.highest_mod_seq = 0;
}
}
trace!("In {}: {:?}", mailbox, state.local_changes);
let fetch = if state.is_empty() {
trace!("Performing full fetch for {}", mailbox);
session.uid_fetch("1:*", "(FLAGS UID BODY.PEEK[])")?
} else {
let f = if refresh_needed {
trace!("Performing refresh fetch for {}", mailbox);
session.uid_fetch("1:*", "(FLAGS UID)")?
} else {
trace!(
"Performing refresh fetch for {} ({} -> {})",
mailbox,
state.last_seen.highest_mod_seq,
new_highest_mod_seq
);
session.uid_fetch(
"1:*",
format!(
"(FLAGS UID) (CHANGEDSINCE {} VANISHED)",
state.last_seen.highest_mod_seq
),
)?
};
let mut to_fetch = SeqSet::new();
let mut all_remote_uids: BTreeSet<u32> = BTreeSet::new();
let all_local_uids: BTreeSet<u32> = state.uids();
for mail in f.iter() {
let uid = mail
.uid
.ok_or(SyncError::Error("Server did not send UID, giving up"))?;
if refresh_needed {
all_remote_uids.insert(uid);
}
match &state.get(&uid) {
Some((id, local_flags)) => {
let id = id.clone();
let remote_flags = flags::imap_to_maildir(mail.flags());
if local_flags != &remote_flags {
state.safe_update(uid, &id, &remote_flags)?;
}
if maildir.set_flags(&id, &remote_flags).is_err() {
if maildir
.move_new_to_cur_with_flags(&id, &remote_flags)
.is_err()
{
return Err(SyncError::E(format!(
"Failed to set flags for {} - inconsistent state?",
id
)));
}
}
}
None => {
to_fetch.insert(uid);
}
}
}
let mut delete_mail = |uid: &u32| -> Result<(), SyncError> {
trace!("{}: mail deleted on remote side: UID {}", mailbox, uid);
if let Some(local) = state.safe_delete(uid)? {
maildir.delete(&local).map_err(|_| {
SyncError::Error("Error deleting mail locally - inconsistent state?")
})?;
}
Ok(())
};
for unsol in session.unsolicited_responses.try_iter() {
if let imap::types::UnsolicitedResponse::Vanished { earlier: _, uids } = unsol {
for uid in uids.into_iter().flatten() {
delete_mail(&uid)?;
}
} else {
trace!("{}: unhandled unsolicited response: {:?}", mailbox, unsol);
}
}
if refresh_needed {
let deleted = all_local_uids.difference(&all_remote_uids);
for uid in deleted {
delete_mail(uid)?;
}
}
session.uid_fetch(format!("{}", to_fetch), "(FLAGS UID BODY.PEEK[])")?
};
for mail in fetch.iter() {
let body = mail
.body()
.ok_or(SyncError::Error("downloaded message without content"))?;
let uid = mail
.uid
.ok_or(SyncError::Error("Server did not send UID, giving up"))?;
if uid > state.last_seen.uid {
state.last_seen.uid = uid;
}
let recent = (mail.flags().is_empty()) || (mail.flags()[0] == imap::types::Flag::Recent);
if !recent {
let flags = flags::imap_to_maildir(mail.flags());
match maildir.store_cur_with_flags(body, &flags) {
Ok(newid) => {
state.insert(uid, (newid, flags));
}
Err(e) => {
error!("{}: error saving new message: {}", mailbox, e);
state.save()?;
return Err(SyncError::MaildirError(e));
}
};
} else {
match maildir.store_new(body) {
Ok(newid) => {
state.insert(uid, (newid, String::from("")));
}
Err(e) => {
error!("{}: error saving new message: {}", mailbox, e);
state.save()?;
return Err(SyncError::MaildirError(e));
}
};
}
}
state.last_seen.highest_mod_seq = new_highest_mod_seq;
state.last_seen.uid_validity = new_uid_validity;
imap_apply_local_changes(session, mailbox, &mut state, &maildir)?;
state.save()?;
Ok(())
}
#[cfg(test)]
#[cfg(feature = "test-full")]
mod tests {
extern crate imap;
extern crate lettre;
use super::*;
use tempfile::tempdir;
const TO: &str = "vsync@localhost";
fn test_host() -> String {
std::env::var("TEST_HOST").unwrap_or("127.0.0.1".to_string())
}
fn test_imaps_port() -> u16 {
std::env::var("TEST_IMAPS_PORT")
.unwrap_or("3143".to_string())
.parse()
.unwrap_or(3143)
}
fn clean_mailbox<T: io::Read + io::Write>(session: &mut imap::Session<T>) {
session.select("INBOX").unwrap();
let inbox = session.search("ALL").unwrap();
if !inbox.is_empty() {
session.uid_store("1:*", "+FLAGS (\\Deleted)").unwrap();
}
session.expunge().unwrap();
}
fn session(user: &str) -> imap::Session<imap::Connection> {
let host = test_host();
let port = test_imaps_port();
let mut c = imap::ClientBuilder::new(&host, port)
.mode(imap::ConnectionMode::Plaintext)
.connect()
.unwrap();
c.debug = true;
let mut s = c.login(user, user).unwrap();
s.debug = true;
clean_mailbox(&mut s);
s
}
#[test]
fn sync_actions() {
let dir = tempdir().unwrap();
let tmp_path = dir.path().to_owned();
assert!(tmp_path.exists());
let maildir = maildir::Maildir::from(tmp_path);
maildir.create_dirs().unwrap();
let mut c = session(TO);
c.run_command_and_read_response("ENABLE QRESYNC").unwrap();
let e: lettre::Message = lettre::message::Message::builder()
.from("sender@localhost".parse().unwrap())
.to(TO.parse().unwrap())
.subject("My second e-mail")
.body("Hello world".to_string())
.unwrap()
.into();
let mbox = "INBOX";
let mailbox = c.select(mbox).unwrap();
c.append(mbox, &e.formatted()).finish().unwrap();
let sync_job = mailboxes::SyncJob {
action: mailboxes::SyncAction::CreateLocal,
name: String::from(mbox),
delimiter: ".".to_string(),
};
let state = state::SyncState::load(&maildir.path()).unwrap();
pull(&mut c, &sync_job, maildir, mailbox, state).unwrap();
let maildir = maildir::Maildir::from(dir.path().to_owned());
let state = state::SyncState::load(&maildir.path()).unwrap();
assert_eq!(maildir.count_new() + maildir.count_cur(), 1);
let inbox = c.uid_search("ALL").unwrap();
assert_eq!(inbox, state.uids().into_iter().collect());
clean_mailbox(&mut c);
let inbox = c.search("ALL").unwrap();
assert_eq!(inbox.len(), 0);
let sync_job = mailboxes::SyncJob {
action: mailboxes::SyncAction::Sync,
name: String::from(mbox),
delimiter: ".".to_string(),
};
let mailbox = c.select(mbox).unwrap();
push(&mut c, &sync_job, maildir, mailbox, state).unwrap();
let maildir = maildir::Maildir::from(dir.path().to_owned());
let state = state::SyncState::load(&maildir.path()).unwrap();
assert_eq!(maildir.count_new(), 1);
let inbox = c.uid_search("ALL").unwrap();
assert_eq!(inbox, state.uids().into_iter().collect());
maildir.store_cur_with_flags(&e.formatted(), "S").unwrap();
let state = state::SyncState::load(&maildir.path()).unwrap();
assert_eq!(state.local_changes.added.len(), 1);
c.append(mbox, &e.formatted()).finish().unwrap();
let mailbox = c.select(mbox).unwrap();
sync(&mut c, &sync_job, maildir, mailbox, state).unwrap();
let maildir = maildir::Maildir::from(dir.path().to_owned());
let state = state::SyncState::load(&maildir.path()).unwrap();
assert_eq!(state.local_changes.added.len(), 0);
assert_eq!(maildir.count_new() + maildir.count_cur(), 3);
let inbox = c.uid_search("ALL").unwrap();
assert_eq!(inbox, state.uids().into_iter().collect());
}
}