use imap::extensions::list_status::ExtendedNames;
use imap::ClientBuilder;
use imap::Session;
use log::{debug, error, info, trace};
use maildir::Maildir;
use native_tls::TlsConnector;
use std::collections::BTreeSet;
use std::fs;
use std::io;
use std::iter::Iterator;
use std::path::PathBuf;
use std::thread;
use std::time::Instant;
use thiserror::Error;
use vomit::{Mailbox, VomitError};
use wildmatch::WildMatch;
mod flags;
mod mailboxes;
mod seqset;
mod state;
mod sync;
#[derive(Error, Debug)]
pub enum SyncError {
#[error("IMAP error: {0}")]
ProtocolError(#[from] imap::Error),
#[error("invalid configuration: {0}")]
ConfigError(&'static str),
#[error("Refused to perform dangerous action - set 'force' option to override")]
DangerousActionError(),
#[error("failed to load state: {0}")]
StateError(#[from] state::StateError),
#[error("IPC error: {0}")]
IPCError(#[from] spmc::SendError<mailboxes::SyncJob>),
#[error("error accessing maildir: {0}")]
IOError(#[from] io::Error),
#[error("error managing maildir: {0}")]
MaildirError(#[from] maildir::MaildirError),
#[error("{0}")]
MailboxesError(#[from] mailboxes::MailboxesError),
#[error("error in maildir abstraction library: {0}")]
VomitError(#[from] VomitError),
#[error("{0}")]
Error(&'static str),
#[error("{0}")]
E(String),
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum SyncDirection {
Pull,
Push,
TwoWay,
}
#[derive(Clone, Debug)]
pub struct SyncOptions {
pub local: String,
pub remote: String,
pub user: String,
pub password: String,
pub threads: u8,
pub unsafe_tls: bool,
pub list_mailbox_actions: bool,
pub include: Vec<String>,
pub exclude: Vec<String>,
pub force: bool,
}
macro_rules! measure {
( $m:expr, $x:expr ) => {{
let start = Instant::now();
let result = $x;
let duration = start.elapsed();
debug!("{} in {}ms", $m, duration.as_millis());
result
}};
}
const DATA_ITEMS_HMS: &str = "(HIGHESTMODSEQ)";
fn get_hierarchy_delimiter(names: &ExtendedNames) -> Result<String, SyncError> {
let delims: BTreeSet<&str> = names
.iter()
.filter_map(|(name, _)| name.delimiter())
.collect();
if delims.len() != 1 {
return Err(SyncError::E(format!(
"Expected exactly on hierarchy delimiter, found {:?}",
delims
)));
}
Ok(String::from(delims.into_iter().next().unwrap()))
}
fn new_session(
opts: &SyncOptions,
) -> Result<Session<impl std::io::Read + std::io::Write>, SyncError> {
let (host, port) = match opts.remote.rsplit_once(':') {
Some((host, port)) => (host, port),
None => (opts.remote.as_str(), "993"),
};
let port = match port.parse() {
Ok(p) => p,
Err(e) => {
error!("failed to parse remote port: {}", e);
return Err(SyncError::ConfigError("invalid port"));
}
};
debug!("Connecting to {}:{}", host, port);
let client = if opts.unsafe_tls {
ClientBuilder::new(host, port).connect(|domain, tcp| {
let ssl_conn = TlsConnector::builder()
.danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true)
.build()?;
Ok(TlsConnector::connect(&ssl_conn, domain, tcp).unwrap())
})?
} else {
ClientBuilder::new(host, port).native_tls()?
};
debug!("Logging in as {}", &opts.user);
let mut session = client.login(&opts.user, &opts.password).map_err(|e| e.0)?;
session.run_command_and_read_response("ENABLE QRESYNC")?;
Ok(session)
}
fn check_server_capabilities_(opts: &SyncOptions) -> Result<(), SyncError> {
let mut session = new_session(opts)?;
let caps = session.capabilities()?;
for cap in caps.iter() {
trace!("Server capability: {:?}", cap);
}
session.logout()?;
if !caps.has_str("QRESYNC") {
return Err(SyncError::E(
"Server does not support QRESYNC (RFC 7162)".to_string(),
));
}
if !caps.has_str("UIDPLUS") {
return Err(SyncError::E(
"Server does not support UIDPLUS (RFC 4315)".to_string(),
));
}
if !caps.has_str("LIST-STATUS") {
return Err(SyncError::E(
"Server does not support LIST-STATUS (RFC 5819)".to_string(),
));
}
Ok(())
}
pub fn list_mailboxes(opts: &SyncOptions) -> Result<(), SyncError> {
let mut session = new_session(opts)?;
let names = session.list_status(None, Some("*"), DATA_ITEMS_HMS)?;
if names.is_empty() {
return Err(SyncError::Error("No remote mailboxes found"));
}
let delimiter = get_hierarchy_delimiter(&names)?;
let remote_mailboxes = mailboxes::load_remote(&names);
let local_mailboxes = mailboxes::load_local(&opts.local, &delimiter)?;
info!("Hierarchy delimiter: {}", delimiter);
info!("Remote mailboxes:");
for m in remote_mailboxes.keys() {
info!(" {}", m);
}
info!("Local mailboxes:");
for m in local_mailboxes.keys() {
info!(" {}", m);
}
Ok(())
}
pub fn check_server_capabilities(opts: &SyncOptions) -> Result<(), SyncError> {
measure!(
format!("Checked capabilities for {}", opts.remote),
check_server_capabilities_(opts)
)
}
fn sync_mailbox<T: io::Read + io::Write>(
maildir_root: &str,
sync_job: &mailboxes::SyncJob,
direction: &SyncDirection,
session: &mut Session<T>,
) -> Result<(), SyncError> {
let mailbox = &sync_job.name;
let dirname = Mailbox::virtual_to_dir(mailbox, &sync_job.delimiter);
let mbpath: PathBuf = [maildir_root, &dirname].iter().collect();
match sync_job.action {
mailboxes::SyncAction::Sync => (),
mailboxes::SyncAction::CreateLocal => {
trace!("Creating local mailbox {}", mailbox);
}
mailboxes::SyncAction::CreateRemote => {
trace!("Creating remote mailbox {}", mailbox);
session.create(mailbox)?
}
mailboxes::SyncAction::DeleteLocal => {
trace!("Deleting local mailbox {}", mailbox);
return fs::remove_dir_all(&mbpath).map_err(SyncError::IOError);
}
mailboxes::SyncAction::DeleteRemote => {
trace!("Deleting remote mailbox {}", mailbox);
return session.delete(mailbox).map_err(SyncError::ProtocolError);
}
};
let maildir = Maildir::from(mbpath);
maildir.create_dirs()?;
let state = measure!(
format!("Loaded state for {}", mailbox),
state::SyncState::load(&maildir.path())?
);
let remote = session.select(mailbox)?;
match direction {
SyncDirection::Pull => sync::pull(session, sync_job, maildir, remote, state),
SyncDirection::Push => sync::push(session, sync_job, maildir, remote, state),
SyncDirection::TwoWay => sync::sync(session, sync_job, maildir, remote, state),
}
.map_err(|e| SyncError::E(format!("{}: {}", mailbox, e)))
}
fn worker_thread(
i: u8,
opts: &SyncOptions,
direction: SyncDirection,
rx: spmc::Receiver<mailboxes::SyncJob>,
) -> Result<(), SyncError> {
let mut imap_session = new_session(opts)?;
while let Ok(job) = rx.recv() {
trace!("Syncing {} in worker thread {}", job.name, i);
measure!(
format!("Synced {}", job.name),
sync_mailbox(&opts.local, &job, &direction, &mut imap_session)?
);
}
imap_session.logout()?;
Ok(())
}
fn sync_(opts: &SyncOptions, direction: SyncDirection) -> Result<(), SyncError> {
info!("Syncing from {}", opts.remote);
let mut root_state = state::RootState::load(&opts.local)?;
let mut imap_session = new_session(opts)?;
let names = imap_session.list_status(None, Some("*"), DATA_ITEMS_HMS)?;
if names.is_empty() {
return Err(SyncError::Error("No remote mailboxes found, giving up"));
}
let includes: Vec<WildMatch> = opts.include.iter().map(|p| WildMatch::new(p)).collect();
let excludes: Vec<WildMatch> = opts.exclude.iter().map(|p| WildMatch::new(p)).collect();
let delimiter = get_hierarchy_delimiter(&names)?;
let remote_mailboxes_with_state = mailboxes::load_remote(&names);
let remote_mailboxes: BTreeSet<&String> = remote_mailboxes_with_state.keys().collect();
let local_mailboxes_with_state = measure!(
format!("Loaded local mailboxes"),
mailboxes::load_local(&opts.local, &delimiter)?
);
let local_mailboxes: BTreeSet<&String> = local_mailboxes_with_state.keys().collect();
trace!("local: {:?}", local_mailboxes);
trace!("remote: {:?}", remote_mailboxes);
let only_local: BTreeSet<&String> = local_mailboxes
.difference(&remote_mailboxes)
.cloned()
.collect();
trace!("Only local: {:?}", only_local);
let only_remote: BTreeSet<&String> = remote_mailboxes
.difference(&local_mailboxes)
.cloned()
.collect();
trace!("Only remote: {:?}", only_remote);
let sync_jobs: Vec<mailboxes::SyncJob> = local_mailboxes
.union(&remote_mailboxes)
.cloned()
.filter_map(|name| {
if !includes.is_empty() && !includes.iter().any(|e| e.matches(name)) {
trace!("Skipping {} due to include filter", name);
return None;
}
if excludes.iter().any(|e| e.matches(name)) {
trace!("Skipping {} due to exclude filter", name);
return None;
}
trace!("Processing mailbox {}", name);
let delimiter = delimiter.clone();
let name = name.clone();
if only_local.contains(&name) {
match direction {
SyncDirection::Pull => {
mailboxes::sync_delete_local(name, delimiter, &mut root_state)
}
SyncDirection::Push => {
mailboxes::sync_create_remote(name, delimiter, &mut root_state)
}
SyncDirection::TwoWay => {
let dirname = Mailbox::virtual_to_dir(&name, &delimiter);
if root_state.subdirs.contains(&dirname) {
mailboxes::sync_delete_local(name, delimiter, &mut root_state)
} else {
mailboxes::sync_create_remote(name, delimiter, &mut root_state)
}
}
}
} else if only_remote.contains(&name) {
match direction {
SyncDirection::Pull => {
mailboxes::sync_create_local(name, delimiter, &mut root_state)
}
SyncDirection::Push => {
mailboxes::sync_delete_remote(name, delimiter, &mut root_state)
}
SyncDirection::TwoWay => {
let dirname = Mailbox::virtual_to_dir(&name, &delimiter);
if root_state.subdirs.contains(&dirname) {
mailboxes::sync_delete_remote(name, delimiter, &mut root_state)
} else {
mailboxes::sync_create_local(name, delimiter, &mut root_state)
}
}
}
} else {
let state = local_mailboxes_with_state.get(&name).unwrap();
let modseq = remote_mailboxes_with_state.get(&name).unwrap();
if !state.has_local_changes() && state.last_seen.highest_mod_seq == *modseq {
trace!("Skipping {} because HIGHESTMODSEQ is in sync", name);
return None;
}
mailboxes::sync(name, delimiter)
}
})
.collect();
if opts.list_mailbox_actions {
info!("The following actions would be performed:");
for job in sync_jobs {
info!(" {}: {:?}", job.name, job.action);
}
return Ok(());
}
let bail = sync_jobs
.iter()
.map(|job| match job.action {
mailboxes::SyncAction::DeleteLocal => {
error!("Refusing to delete local mailbox {}", job.name);
true
}
mailboxes::SyncAction::DeleteRemote => {
error!("Refusing to delete remote mailbox {}", job.name);
true
}
_ => false,
})
.any(|b| b);
if bail {
return Err(SyncError::DangerousActionError());
}
root_state.save()?;
drop(root_state);
let thread_count = if sync_jobs.len() < opts.threads.into() {
sync_jobs.len().try_into().unwrap()
} else {
opts.threads
};
let mut threads = Vec::new();
let (mut tx, rx) = spmc::channel::<mailboxes::SyncJob>();
info!(
"Using {} threads to sync {} mailboxes",
thread_count,
sync_jobs.len()
);
for i in 1..thread_count {
let rx = rx.clone();
let opts = (*opts).clone();
let dir = direction.clone();
let t = thread::spawn(move || {
if let Err(e) = worker_thread(i, &opts, dir, rx) {
error!("Error in worker thread {}: {}", i, e);
};
});
threads.push(t);
}
for job in sync_jobs {
tx.send(job)?;
}
drop(tx);
while let Ok(job) = rx.recv() {
debug!("Syncing {} in main thread ({:?})", job.name, job.action);
measure!(
format!("Synced {}", job.name),
sync_mailbox(&opts.local, &job, &direction, &mut imap_session)?
);
}
_ = imap_session.logout();
for t in threads {
t.join().unwrap();
}
info!("Sync successful");
Ok(())
}
pub fn pull(opts: &SyncOptions) -> Result<(), SyncError> {
measure!(
format!("Pulled from {}", opts.remote),
sync_(opts, SyncDirection::Pull)
)
}
pub fn push(opts: &SyncOptions) -> Result<(), SyncError> {
measure!(
format!("Pushed to {}", opts.remote),
sync_(opts, SyncDirection::Push)
)
}
pub fn sync(opts: &SyncOptions) -> Result<(), SyncError> {
measure!(
format!("Synced with {}", opts.remote),
sync_(opts, SyncDirection::TwoWay)
)
}