vomit_m2sync/
lib.rs

1//! # vomit-m2sync
2//!
3//! `vomit-m2sync` provides full two-way synchronization between IMAP and
4//! a local [m2dir][4]. At the moment, it has to be called periodically to
5//! keep the two synchronized.
6//!
7//! [4]: https://sr.ht/~bitfehler/m2dir
8//!
9//! It uses the [log][1] crate for logging, so you can receive logs from it by
10//! using any of the compatible logging libraries.
11//!
12//! [1]: https://crates.io/crates/log
13//!
14//! [m2sync][2] is small CLI wrapper around `vomit-m2sync`.
15//!
16//! [2]: https://crates.io/crates/m2sync
17//!
18//! As the name implies, `vomit-m2sync` is part of the [vomit project][3].
19//!
20//! [3]: https://sr.ht/~bitfehler/vomit
21
22use imap::extensions::list_status::ExtendedNames;
23use imap::ClientBuilder;
24use imap::ImapConnection;
25use imap::Session;
26use log::warn;
27use log::{debug, error, info, trace};
28use std::collections::BTreeSet;
29use std::fs;
30use std::io;
31use std::iter;
32use std::iter::Iterator;
33use std::path::PathBuf;
34use std::sync::atomic::AtomicBool;
35use std::sync::atomic::Ordering;
36use std::sync::mpsc;
37use std::sync::Arc;
38use std::thread;
39use std::time::Instant;
40use wildmatch::WildMatch;
41
42use crate::state::SyncAction;
43use crate::state::SyncJob;
44
45mod flags;
46mod mailboxes;
47mod seqset;
48mod state;
49mod sync;
50
51#[derive(thiserror::Error, Debug)]
52pub enum Error {
53    #[error("IMAP error: {0}")]
54    ProtocolError(#[from] imap::Error),
55    #[error("server is incompatible: no UIDVALIDITY provided")]
56    ServerIncompatible(),
57    #[error("APPEND did not return any UIDs")]
58    InvalidAppendResponse(),
59    #[error("server did not send UID, giving up")]
60    MissingUID(),
61    #[error("downloaded message has no body")]
62    MissingBody(),
63    #[error("invalid configuration: {0}")]
64    ConfigError(&'static str),
65    #[error("refused to perform dangerous action - set 'force' option to override")]
66    DangerousAction(),
67    #[error("server does not support {0}")]
68    MissingCapability(&'static str),
69    #[error("found multiple hierarchy delimiters")]
70    MultipleHierarchyDelimiters(),
71    #[error("no remote mailboxes found")]
72    NoRemoteMailboxes(),
73    #[error("UID validity change with pending local changes makes resynchronization impossible")]
74    UIDValidityChangeWithLocalChanges(),
75    #[error("not all mailboxes were synced successfully")]
76    SyncIncomplete(),
77    #[error("failed to load state: {0}")]
78    StateError(#[from] state::Error),
79    #[error("IPC error: {0}")]
80    IPCError(#[from] spmc::SendError<state::SyncJob>),
81    #[error("error accessing maildir: {0}")]
82    IOError(#[from] io::Error),
83    #[error("interrupted")]
84    Interrupted(),
85    // #[error("{0}")]
86    // MailboxesError(#[from] mailboxes::MailboxesError),
87}
88
89#[derive(Clone, Debug, PartialEq, Eq)]
90pub enum SyncDirection {
91    Pull,
92    Push,
93    TwoWay,
94}
95
96/// A set of options to control the synchronization process
97#[derive(Clone, Debug)]
98pub struct SyncOptions {
99    /// A unique ID that must remain constant for all invocations that sync the same targets,
100    /// but must be unique for each set of targets. This can be considered an "account ID". It
101    /// can be used to allow syncing the same local directory to different remote accounts.
102    pub uid: String,
103    /// The local maildir to sync to
104    pub local: String,
105    /// The IMAP server (and optional port) to sync from
106    pub remote: String,
107    /// The user for IMAP authentication
108    pub user: String,
109    /// The password for IMAP authentication
110    pub password: String,
111    /// The number of threads to use
112    ///
113    /// Each thread will use it's own IMAP session, so this must not be greater
114    /// than the number of concurrent user sessions allowed by the IMAP server.
115    pub threads: u8,
116    /// Disable TLS certificate checks (e.g. for self-signed certs) (insecure)
117    pub unsafe_tls: bool,
118    /// Completely disable TLS (very insecure)
119    pub disable_tls: bool,
120    /// Only log with info level what actions (sync, create, delete) would be taken on
121    /// which mailboxes (taking include/exclude into account), then exit.
122    pub list_mailbox_actions: bool,
123    /// A list of wildcard patterns to include only folders that match any of them.
124    ///
125    /// The wildcard pattern only supports `?` and `*` and must match the full mailbox name.
126    pub include: Vec<String>,
127    /// A list of wildcard patterns to exclude all folders that match any of them.
128    ///
129    /// The wildcard pattern only supports `?` and `*` and must match the full mailbox name.
130    /// If used together with `include` and both match, `exclude` takes precedence.
131    pub exclude: Vec<String>,
132    /// Confirm execution of potentially dangerous actions (e.g. deleting mailboxes)
133    pub force: bool,
134}
135
136macro_rules! measure {
137    ( $m:expr, $x:expr ) => {{
138        let start = Instant::now();
139        let result = $x;
140        let duration = start.elapsed();
141        debug!("{} in {}ms", $m, duration.as_millis());
142        result
143    }};
144}
145
146const DATA_ITEMS_HMS: &str = "(HIGHESTMODSEQ)";
147
148fn get_hierarchy_delimiter(names: &ExtendedNames) -> Result<String, Error> {
149    let delims: BTreeSet<&str> = names
150        .iter()
151        .filter_map(|(name, _)| name.delimiter())
152        .collect();
153    if delims.len() != 1 {
154        return Err(Error::MultipleHierarchyDelimiters());
155    }
156    Ok(String::from(delims.into_iter().next().unwrap()))
157}
158
159fn new_session(opts: &SyncOptions) -> Result<Session<Box<dyn ImapConnection>>, Error> {
160    let (host, port) = match opts.remote.rsplit_once(':') {
161        Some((host, port)) => (host, port),
162        None => (opts.remote.as_str(), "993"),
163    };
164
165    let port = match port.parse() {
166        Ok(p) => p,
167        Err(e) => {
168            error!("failed to parse remote port: {}", e);
169            return Err(Error::ConfigError("invalid port"));
170        }
171    };
172
173    debug!("Connecting to {}:{}", host, port);
174
175    let client = if opts.disable_tls {
176        ClientBuilder::new(host, port)
177            .mode(imap::ConnectionMode::Plaintext)
178            .connect()?
179    } else if opts.unsafe_tls {
180        ClientBuilder::new(host, port)
181            .danger_skip_tls_verify(true)
182            .connect()?
183    } else {
184        ClientBuilder::new(host, port).connect()?
185    };
186
187    debug!("Logging in as {}", &opts.user);
188    let mut session = client.login(&opts.user, &opts.password).map_err(|e| e.0)?;
189
190    session.run_command_and_read_response("ENABLE QRESYNC")?;
191
192    Ok(session)
193}
194
195fn check_server_capabilities_(opts: &SyncOptions) -> Result<(), Error> {
196    let mut session = new_session(opts)?;
197    let caps = session.capabilities()?;
198    for cap in caps.iter() {
199        trace!("Server capability: {:?}", cap);
200    }
201    session.logout()?;
202    if !caps.has_str("QRESYNC") {
203        return Err(Error::MissingCapability("QRESYNC (RFC 7162)"));
204    }
205    if !caps.has_str("UIDPLUS") {
206        return Err(Error::MissingCapability("UIDPLUS (RFC 4315)"));
207    }
208    if !caps.has_str("LIST-STATUS") {
209        return Err(Error::MissingCapability("LIST-STATUS (RFC 5819)"));
210    }
211    Ok(())
212}
213
214/// List remote and local mailboxes with log level "info".
215///
216/// This is intended for human consumption. It can e.g. be presented to the
217/// user as a verification that all mailboxes are found as expected.
218pub fn list_mailboxes(opts: &SyncOptions) -> Result<(), Error> {
219    let mut session = new_session(opts)?;
220    let names = session.list_status(None, Some("*"), DATA_ITEMS_HMS)?;
221    if names.is_empty() {
222        return Err(Error::NoRemoteMailboxes());
223    }
224
225    let delimiter = get_hierarchy_delimiter(&names)?;
226    let remote_mailboxes = state::load_remote_mailboxes(&names);
227    let local_mailboxes = state::load_local_mailboxes(&opts.uid, &opts.local, &delimiter)?;
228
229    info!("Hierarchy delimiter: {}", delimiter);
230    info!("Remote mailboxes:");
231    for m in remote_mailboxes.keys() {
232        info!("  {}", m);
233    }
234    info!("Local mailboxes:");
235    for m in local_mailboxes.keys() {
236        info!("  {}", m);
237    }
238    Ok(())
239}
240
241/// Check if the configured server supports the required IMAP capabilities
242///
243/// Currently, at least the QRESYNC capability is required ([RFC 7162][1]).
244///
245/// [1]: https://www.rfc-editor.org/rfc/rfc7162.html
246pub fn check_server_capabilities(opts: &SyncOptions) -> Result<(), Error> {
247    measure!(
248        format!("Checked capabilities for {}", opts.remote),
249        check_server_capabilities_(opts)
250    )
251}
252
253fn sync_mailbox<T: io::Read + io::Write>(
254    state_id: &str,
255    maildir_root: &str,
256    sync_job: &state::SyncJob,
257    direction: &SyncDirection,
258    session: &mut Session<T>,
259    abort: &Arc<AtomicBool>,
260) -> Result<(), Error> {
261    let mailbox = &sync_job.name;
262    let dirname = state::mailbox_to_dirname(mailbox, &sync_job.delimiter);
263    let mbpath: PathBuf = [maildir_root, &dirname].iter().collect();
264
265    match sync_job.action {
266        state::SyncAction::Sync => (),
267        state::SyncAction::CreateLocal => {
268            trace!("Creating local mailbox {}", mailbox);
269            // Local dir creation will happen automatically
270        }
271        state::SyncAction::CreateRemote => {
272            trace!("Creating remote mailbox {}", mailbox);
273            session.create(mailbox)?
274        }
275        state::SyncAction::DeleteLocal => {
276            trace!("Deleting local mailbox {}", mailbox);
277            return fs::remove_dir_all(&mbpath).map_err(Error::IOError);
278        }
279        state::SyncAction::DeleteRemote => {
280            trace!("Deleting remote mailbox {}", mailbox);
281            return session.delete(mailbox).map_err(Error::ProtocolError);
282        }
283    };
284
285    let state = measure!(
286        format!("Loaded state for {}", mailbox),
287        state::SyncState::load(&mbpath, state_id)?
288    );
289
290    let remote = session.select(mailbox)?;
291
292    match direction {
293        SyncDirection::Pull => sync::pull(session, sync_job, remote, state, abort),
294        SyncDirection::Push => sync::push(session, sync_job, remote, state, abort),
295        SyncDirection::TwoWay => sync::sync(session, sync_job, remote, state, abort),
296    }
297}
298
299fn worker_thread(
300    i: u8,
301    opts: &SyncOptions,
302    direction: SyncDirection,
303    rx: spmc::Receiver<state::SyncJob>,
304    tx: mpsc::Sender<state::SyncJob>,
305    session: Option<Session<Box<dyn ImapConnection>>>,
306    abort: Arc<AtomicBool>,
307) -> Result<(), Error> {
308    let mut imap_session = match session {
309        Some(s) => s,
310        None => new_session(opts)?,
311    };
312
313    while let Ok(mut job) = rx.recv() {
314        if abort.load(Ordering::Relaxed) {
315            warn!("Worker thread {} exiting - interrupted", i);
316            break;
317        };
318        trace!("Syncing {} in worker thread {}", job.name, i);
319        match measure!(
320            format!("Synced {}", job.name),
321            sync_mailbox(
322                &opts.uid,
323                &opts.local,
324                &job,
325                &direction,
326                &mut imap_session,
327                &abort
328            )
329        ) {
330            Ok(_) => job.success = true,
331            Err(e) => error!("Error syncing {}: {}", job.name, e),
332        }
333        tx.send(job)?;
334    }
335    drop(tx);
336    imap_session.logout()?;
337    Ok(())
338}
339
340fn sync_(
341    opts: &SyncOptions,
342    direction: SyncDirection,
343    abort: Arc<AtomicBool>,
344) -> Result<(), Error> {
345    info!("Syncing from {} to {}", opts.remote, opts.local);
346
347    fs::create_dir_all(&opts.local)?;
348
349    let root_state = state::RootState::load(&opts.local, &opts.uid)?;
350
351    let mut imap_session = new_session(opts)?;
352
353    let names = imap_session.list_status(None, Some("*"), DATA_ITEMS_HMS)?;
354
355    if names.is_empty() {
356        return Err(Error::NoRemoteMailboxes());
357    }
358
359    let includes: Vec<WildMatch> = opts.include.iter().map(|p| WildMatch::new(p)).collect();
360    let excludes: Vec<WildMatch> = opts.exclude.iter().map(|p| WildMatch::new(p)).collect();
361
362    let delimiter = get_hierarchy_delimiter(&names)?;
363    let remote_mailboxes_with_state = state::load_remote_mailboxes(&names);
364    let remote_mailboxes: BTreeSet<&String> = remote_mailboxes_with_state.keys().collect();
365    let local_mailboxes_with_state = measure!(
366        format!("Loaded local mailboxes"),
367        state::load_local_mailboxes(&opts.uid, &opts.local, &delimiter)?
368    );
369    let local_mailboxes: BTreeSet<&String> = local_mailboxes_with_state.keys().collect();
370
371    trace!("local: {:?}", local_mailboxes);
372    trace!("remote: {:?}", remote_mailboxes);
373
374    let only_local: BTreeSet<&String> = local_mailboxes
375        .difference(&remote_mailboxes)
376        .cloned()
377        .collect();
378    trace!("Only local: {:?}", only_local);
379    let only_remote: BTreeSet<&String> = remote_mailboxes
380        .difference(&local_mailboxes)
381        .cloned()
382        .collect();
383    trace!("Only remote: {:?}", only_remote);
384    let sync_jobs: Vec<state::SyncJob> = local_mailboxes
385        .union(&remote_mailboxes)
386        .cloned()
387        .filter_map(|name| {
388            // Apply user-provided include/exclude filter
389            if !includes.is_empty() && !includes.iter().any(|e| e.matches(name)) {
390                trace!("Skipping {} due to include filter", name);
391                return None;
392            }
393            if excludes.iter().any(|e| e.matches(name)) {
394                trace!("Skipping {} due to exclude filter", name);
395                return None;
396            }
397            trace!("Processing mailbox {}", name);
398            let delimiter = delimiter.clone();
399
400            // We'll have to go through the tedious process of determining what to do
401            // with mailboxes that are missing on one side.
402            let name = name.clone();
403            if only_local.contains(&name) {
404                match direction {
405                    SyncDirection::Pull => {
406                        // Delete locally, also from root state
407                        SyncJob::new(name, delimiter, SyncAction::DeleteLocal)
408                    }
409                    SyncDirection::Push => {
410                        // Create remotely, add to root state
411                        SyncJob::new(name, delimiter, SyncAction::CreateRemote)
412                    }
413                    SyncDirection::TwoWay => {
414                        let dirname = state::mailbox_to_dirname(&name, &delimiter);
415                        if root_state
416                            .contains_subdir(&dirname)
417                            .expect("failed to inspect current state")
418                        {
419                            // Exists locally only because removed on server side
420                            // Delete locally, also from root state
421                            SyncJob::new(name, delimiter, SyncAction::DeleteLocal)
422                        } else {
423                            // Exists locally only because created here
424                            // Create remotely, add to root state
425                            SyncJob::new(name, delimiter, SyncAction::CreateRemote)
426                        }
427                    }
428                }
429            } else if only_remote.contains(&name) {
430                match direction {
431                    SyncDirection::Pull => {
432                        // Create locally (already happening), add to root state (after sync?)
433                        SyncJob::new(name, delimiter, SyncAction::CreateLocal)
434                    }
435                    SyncDirection::Push => {
436                        // Delete remotely, also from root state
437                        SyncJob::new(name, delimiter, SyncAction::DeleteRemote)
438                    }
439                    SyncDirection::TwoWay => {
440                        let dirname = state::mailbox_to_dirname(&name, &delimiter);
441                        if root_state
442                            .contains_subdir(&dirname)
443                            .expect("failed to inspect current state")
444                        {
445                            // Exists remotely only because removed locally
446                            // Delete remotely, also from root state
447                            SyncJob::new(name, delimiter, SyncAction::DeleteRemote)
448                        } else {
449                            // Exists remotely only because created there
450                            // Create locally (already happening), add to root state (after sync?)
451                            SyncJob::new(name, delimiter, SyncAction::CreateLocal)
452                        }
453                    }
454                }
455            } else {
456                // Exists on both sides, just sync
457                let state = local_mailboxes_with_state.get(&name).unwrap();
458                let modseq = remote_mailboxes_with_state.get(&name).unwrap();
459                if !state.has_local_changes() && state.last_seen_highest_mod_seq() == *modseq {
460                    trace!("Skipping {} because HIGHESTMODSEQ is in sync", name);
461                    return None;
462                }
463                SyncJob::new(name, delimiter, state::SyncAction::Sync)
464            }
465        })
466        .collect();
467
468    if opts.list_mailbox_actions {
469        info!("The following actions would be performed:");
470        for job in sync_jobs {
471            info!("  {}: {:?}", job.name, job.action);
472        }
473        return Ok(());
474    }
475
476    let bail = sync_jobs
477        .iter()
478        .map(|job| match job.action {
479            state::SyncAction::DeleteLocal => {
480                warn!("About to delete local mailbox {}", job.name);
481                true
482            }
483            state::SyncAction::DeleteRemote => {
484                warn!("About to delete remote mailbox {}", job.name);
485                true
486            }
487            _ => false,
488        })
489        .any(|b| b);
490    if bail && !opts.force {
491        return Err(Error::DangerousAction());
492    }
493
494    let thread_count = if sync_jobs.len() < opts.threads.into() {
495        // Conversion must be safe here
496        sync_jobs.len().try_into().unwrap()
497    } else {
498        opts.threads
499    };
500
501    let (mut tx_work, rx_work) = spmc::channel::<state::SyncJob>();
502    let (tx_result, rx_result) = mpsc::channel::<state::SyncJob>();
503
504    info!(
505        "Using {} threads to sync {} mailboxes",
506        thread_count,
507        sync_jobs.len()
508    );
509
510    let mut sess = iter::once(imap_session);
511
512    let threads: Vec<_> = (0..thread_count)
513        .map(|i| {
514            let rx = rx_work.clone();
515            let tx = tx_result.clone();
516            let opts = (*opts).clone();
517            let dir = direction.clone();
518            let session = sess.next();
519            let abort = Arc::clone(&abort);
520
521            thread::spawn(move || {
522                if let Err(e) = worker_thread(i, &opts, dir, rx, tx, session, abort) {
523                    error!("Error in worker thread {}: {}", i, e);
524                };
525            })
526        })
527        .collect();
528
529    let expected_results = sync_jobs.len();
530    let mut received_results = 0usize;
531
532    for job in sync_jobs {
533        tx_work.send(job)?;
534    }
535
536    drop(tx_work);
537    drop(tx_result);
538
539    let mut success = true;
540
541    while let Ok(job) = rx_result.recv() {
542        // update root state if needed
543        trace!("root state update: {:?} {}", job.action, job.name);
544        success &= job.success;
545        received_results += 1;
546
547        root_state.sync_done(job)?;
548    }
549
550    for t in threads {
551        t.join().unwrap();
552    }
553
554    if success && (expected_results == received_results) {
555        info!("Sync successful");
556        Ok(())
557    } else {
558        error!("Sync failed!");
559        Err(Error::SyncIncomplete())
560    }
561}
562
563/// Apply all changes from IMAP to the local maildir
564///
565/// This overwrites any changes that may have happened on the local side.
566///
567/// <div class="warning">
568///
569/// Synchronizing local and remote state as quickly as possible requires an
570/// additional index, which itself has to be kept in sync with the actual
571/// storage. Doing this all atomically is next to impossible. Any caller of this
572/// function **should catch termination signals!** Setting the inner value of
573/// the `abort` argument to `true` will make the function return an error as
574/// early as possible, while still making sure the persisted state is
575/// consistent.
576///
577/// </div>
578///
579/// # Arguments
580///
581/// * `opts`: options controlling the synchronization process
582/// * `abort`: if the inner value becomes true at any time, the sync will abort
583///   as soon as possible without corrupting the index
584pub fn pull(opts: &SyncOptions, abort: Arc<AtomicBool>) -> Result<(), Error> {
585    measure!(
586        format!("Pulled from {}", opts.remote),
587        sync_(opts, SyncDirection::Pull, abort)
588    )
589}
590
591/// Apply all changes in local maildir to IMAP
592///
593/// This overwrites any changes that may have happened on the IMAP side.
594///
595/// <div class="warning">
596///
597/// Synchronizing local and remote state as quickly as possible requires an
598/// additional index, which itself has to be kept in sync with the actual
599/// storage. Doing this all atomically is next to impossible. Any caller of this
600/// function **should catch termination signals!** Setting the inner value of
601/// the `abort` argument to `true` will make the function return an error as
602/// early as possible, while still making sure the persisted state is
603/// consistent.
604///
605/// </div>
606///
607/// # Arguments
608///
609/// * `opts`: options controlling the synchronization process
610/// * `abort`: if the inner value becomes true at any time, the sync will abort
611///   as soon as possible without corrupting the index
612pub fn push(opts: &SyncOptions, abort: Arc<AtomicBool>) -> Result<(), Error> {
613    measure!(
614        format!("Pushed to {}", opts.remote),
615        sync_(opts, SyncDirection::Push, abort)
616    )
617}
618
619/// Synchronize local and remote changes between IMAP and local maildir
620///
621/// This includes fetching new mail, deleting expunged mails, updating tags,
622/// etc.
623///
624/// <div class="warning">
625///
626/// Synchronizing local and remote state as quickly as possible requires an
627/// additional index, which itself has to be kept in sync with the actual
628/// storage. Doing this all atomically is next to impossible. Any caller of this
629/// function **should catch termination signals!** Setting the inner value of
630/// the `abort` argument to `true` will make the function return an error as
631/// early as possible, while still making sure the persisted state is
632/// consistent.
633///
634/// </div>
635///
636/// # Arguments
637///
638/// * `opts`: options controlling the synchronization process
639/// * `abort`: if the inner value becomes true at any time, the sync will abort
640///   as soon as possible without corrupting the index
641pub fn sync(opts: &SyncOptions, abort: Arc<AtomicBool>) -> Result<(), Error> {
642    measure!(
643        format!("Synced with {}", opts.remote),
644        sync_(opts, SyncDirection::TwoWay, abort)
645    )
646}