vomit_m2sync/
lib.rs

1//! # vomit-m2sync
2//!
3//! `vomit-m2sync` provides full two-way synchronization between IMAP and
4//! a local [m2dir][4]. At the moment, it has to be called periodically to
5//! keep the two synchronized.
6//!
7//! [4]: https://sr.ht/~bitfehler/m2dir
8//!
9//! It uses the [log][1] crate for logging, so you can receive logs from it by
10//! using any of the compatible logging libraries.
11//!
12//! [1]: https://crates.io/crates/log
13//!
14//! [m2sync][2] is small CLI wrapper around `vomit-m2sync`.
15//!
16//! [2]: https://crates.io/crates/m2sync
17//!
18//! As the name implies, `vomit-m2sync` is part of the [vomit project][3].
19//!
20//! [3]: https://sr.ht/~bitfehler/vomit
21
22use imap::extensions::list_status::ExtendedNames;
23use imap::ClientBuilder;
24use imap::ImapConnection;
25use imap::Session;
26use log::warn;
27use log::{debug, error, info, trace};
28use std::collections::BTreeSet;
29use std::fs;
30use std::io;
31use std::iter;
32use std::iter::Iterator;
33use std::path::Path;
34use std::path::PathBuf;
35use std::sync::atomic::AtomicBool;
36use std::sync::atomic::Ordering;
37use std::sync::mpsc;
38use std::sync::Arc;
39use std::thread;
40use std::time::Instant;
41use wildmatch::WildMatch;
42
43use crate::state::SyncAction;
44use crate::state::SyncJob;
45
46mod flags;
47mod mailboxes;
48mod seqset;
49mod state;
50mod sync;
51
52#[derive(thiserror::Error, Debug)]
53pub enum Error {
54    #[error("IMAP error: {0}")]
55    IMAP(#[from] imap::Error),
56    #[error("no UIDVALIDITY provided")]
57    MissingUIDValidity(),
58    #[error("no HIGHESTMODSEQ provided")]
59    MissingHMS(),
60    #[error("APPEND did not return any UIDs")]
61    InvalidAppendResponse(),
62    #[error("server did not send UID, giving up")]
63    MissingUID(),
64    #[error("downloaded message has no body")]
65    MissingBody(),
66    #[error("invalid configuration: {0}")]
67    Config(&'static str),
68    #[error("refused to perform dangerous action - set 'force' option to override")]
69    DangerousAction(),
70    #[error("server does not support {0}")]
71    MissingCapability(&'static str),
72    #[error("found multiple hierarchy delimiters")]
73    MultipleHierarchyDelimiters(),
74    #[error("no remote mailboxes found")]
75    NoRemoteMailboxes(),
76    #[error("UID validity change with pending local changes makes resynchronization impossible")]
77    UIDValidityChangeWithLocalChanges(),
78    #[error("not all mailboxes were synced successfully")]
79    SyncIncomplete(),
80    #[error("failed to load state: {0}")]
81    State(#[from] state::Error),
82    #[error("IPC error: {0}")]
83    IPC(#[from] spmc::SendError<state::SyncJob>),
84    #[error("IO error: {0}")]
85    IO(#[from] io::Error),
86    #[error("interrupted")]
87    Interrupted(),
88}
89
90#[derive(Clone, Debug, PartialEq, Eq)]
91pub enum SyncDirection {
92    Pull,
93    Push,
94    TwoWay,
95}
96
97/// A set of options to control the synchronization process
98#[derive(Clone, Debug)]
99pub struct SyncOptions {
100    /// A unique ID that must remain constant for all invocations that sync the same targets,
101    /// but must be unique for each set of targets. This can be considered an "account ID". It
102    /// can be used to allow syncing the same local directory to different remote accounts.
103    pub uid: String,
104    /// The local m2dir to sync to
105    pub local: String,
106    /// The IMAP server (and optional port) to sync from
107    pub remote: String,
108    /// The user for IMAP authentication
109    pub user: String,
110    /// The password for IMAP authentication
111    pub password: String,
112    /// The number of threads to use
113    ///
114    /// Each thread will use it's own IMAP session, so this must not be greater
115    /// than the number of concurrent user sessions allowed by the IMAP server.
116    pub threads: u8,
117    /// Disable TLS certificate checks (e.g. for self-signed certs) (insecure)
118    pub unsafe_tls: bool,
119    /// Completely disable TLS (very insecure)
120    pub disable_tls: bool,
121    /// Only log with info level what actions (sync, create, delete) would be taken on
122    /// which mailboxes (taking include/exclude into account), then exit.
123    pub list_mailbox_actions: bool,
124    /// A list of wildcard patterns to include only folders that match any of them.
125    ///
126    /// The wildcard pattern only supports `?` and `*` and must match the full mailbox name.
127    pub include: Vec<String>,
128    /// A list of wildcard patterns to exclude all folders that match any of them.
129    ///
130    /// The wildcard pattern only supports `?` and `*` and must match the full mailbox name.
131    /// If used together with `include` and both match, `exclude` takes precedence.
132    pub exclude: Vec<String>,
133    /// Confirm execution of potentially dangerous actions (e.g. deleting mailboxes)
134    pub force: bool,
135}
136
137macro_rules! measure {
138    ( $m:expr, $x:expr ) => {{
139        let start = Instant::now();
140        let result = $x;
141        let duration = start.elapsed();
142        debug!("{} in {}ms", $m, duration.as_millis());
143        result
144    }};
145}
146
147const DATA_ITEMS_HMS: &str = "(HIGHESTMODSEQ)";
148
149fn get_hierarchy_delimiter(names: &ExtendedNames) -> Result<String, Error> {
150    let delims: BTreeSet<&str> = names
151        .iter()
152        .filter_map(|(name, _)| name.delimiter())
153        .collect();
154    if delims.len() != 1 {
155        return Err(Error::MultipleHierarchyDelimiters());
156    }
157    Ok(String::from(delims.into_iter().next().unwrap()))
158}
159
160fn new_session(opts: &SyncOptions) -> Result<Session<Box<dyn ImapConnection>>, Error> {
161    let (host, port) = match opts.remote.rsplit_once(':') {
162        Some((host, port)) => (host, port),
163        None => (opts.remote.as_str(), "993"),
164    };
165
166    let port = match port.parse() {
167        Ok(p) => p,
168        Err(e) => {
169            error!("failed to parse remote port: {}", e);
170            return Err(Error::Config("invalid port"));
171        }
172    };
173
174    debug!("Connecting to {}:{}", host, port);
175
176    let client = if opts.disable_tls {
177        ClientBuilder::new(host, port)
178            .mode(imap::ConnectionMode::Plaintext)
179            .connect()?
180    } else if opts.unsafe_tls {
181        ClientBuilder::new(host, port)
182            .danger_skip_tls_verify(true)
183            .connect()?
184    } else {
185        ClientBuilder::new(host, port).connect()?
186    };
187
188    debug!("Logging in as {}", &opts.user);
189    let mut session = client.login(&opts.user, &opts.password).map_err(|e| e.0)?;
190
191    session.run_command_and_read_response("ENABLE QRESYNC")?;
192
193    Ok(session)
194}
195
196fn check_server_capabilities_(opts: &SyncOptions) -> Result<(), Error> {
197    let mut session = new_session(opts)?;
198    let caps = session.capabilities()?;
199    for cap in caps.iter() {
200        trace!("Server capability: {:?}", cap);
201    }
202    session.logout()?;
203    if !caps.has_str("QRESYNC") {
204        return Err(Error::MissingCapability("QRESYNC (RFC 7162)"));
205    }
206    if !caps.has_str("UIDPLUS") {
207        return Err(Error::MissingCapability("UIDPLUS (RFC 4315)"));
208    }
209    if !caps.has_str("LIST-STATUS") {
210        return Err(Error::MissingCapability("LIST-STATUS (RFC 5819)"));
211    }
212    Ok(())
213}
214
215/// List remote and local mailboxes with log level "info".
216///
217/// This is intended for human consumption. It can e.g. be presented to the
218/// user as a verification that all mailboxes are found as expected.
219pub fn list_mailboxes(opts: &SyncOptions) -> Result<(), Error> {
220    let mut session = new_session(opts)?;
221    let names = session.list_status(None, Some("*"), DATA_ITEMS_HMS)?;
222    if names.is_empty() {
223        return Err(Error::NoRemoteMailboxes());
224    }
225
226    let delimiter = get_hierarchy_delimiter(&names)?;
227    let remote_mailboxes = state::load_remote_mailboxes(&names);
228    let local_mailboxes = state::load_local_mailboxes(&opts.uid, &opts.local, &delimiter)?;
229
230    info!("Hierarchy delimiter: {}", delimiter);
231    info!("Remote mailboxes:");
232    for m in remote_mailboxes.keys() {
233        info!("  {}", m);
234    }
235    info!("Local mailboxes:");
236    for m in local_mailboxes.keys() {
237        info!("  {}", m);
238    }
239    Ok(())
240}
241
242/// Check if the configured server supports the required IMAP capabilities
243///
244/// Currently, at least the QRESYNC capability is required ([RFC 7162][1]).
245///
246/// [1]: https://www.rfc-editor.org/rfc/rfc7162.html
247pub fn check_server_capabilities(opts: &SyncOptions) -> Result<(), Error> {
248    measure!(
249        format!("Checked capabilities for {}", opts.remote),
250        check_server_capabilities_(opts)
251    )
252}
253
254fn sync_mailbox<T: io::Read + io::Write>(
255    state_id: &str,
256    m2store: impl AsRef<Path>,
257    sync_job: &state::SyncJob,
258    direction: &SyncDirection,
259    session: &mut Session<T>,
260    abort: &Arc<AtomicBool>,
261) -> Result<(), Error> {
262    let mailbox = &sync_job.name;
263    let dirname = state::mailbox_to_dirname(mailbox, &sync_job.delimiter);
264    let mbpath: PathBuf = [m2store.as_ref(), dirname.as_ref()].iter().collect();
265
266    match sync_job.action {
267        state::SyncAction::Sync => (),
268        state::SyncAction::CreateLocal => {
269            trace!("Creating local mailbox {}", mailbox);
270            // Local dir creation will happen automatically
271        }
272        state::SyncAction::CreateRemote => {
273            trace!("Creating remote mailbox {}", mailbox);
274            session.create(mailbox)?
275        }
276        state::SyncAction::DeleteLocal => {
277            trace!("Deleting local mailbox {}", mailbox);
278            return fs::remove_dir_all(&mbpath).map_err(Error::IO);
279        }
280        state::SyncAction::DeleteRemote => {
281            trace!("Deleting remote mailbox {}", mailbox);
282            return session.delete(mailbox).map_err(Error::IMAP);
283        }
284    };
285
286    let state = measure!(
287        format!("Loaded state for {}", mailbox),
288        state::SyncState::load(&mbpath, state_id)?
289    );
290
291    let remote = session.select(mailbox)?;
292
293    match direction {
294        SyncDirection::Pull => sync::pull(session, sync_job, remote, state, abort),
295        SyncDirection::Push => sync::push(session, sync_job, remote, state, abort),
296        SyncDirection::TwoWay => sync::sync(session, sync_job, remote, state, abort),
297    }
298}
299
300fn worker_thread(
301    i: u8,
302    opts: &SyncOptions,
303    direction: SyncDirection,
304    rx: spmc::Receiver<state::SyncJob>,
305    tx: mpsc::Sender<state::SyncJob>,
306    session: Option<Session<Box<dyn ImapConnection>>>,
307    abort: Arc<AtomicBool>,
308) -> Result<(), Error> {
309    let mut imap_session = match session {
310        Some(s) => s,
311        None => new_session(opts)?,
312    };
313
314    while let Ok(mut job) = rx.recv() {
315        if abort.load(Ordering::Relaxed) {
316            warn!("Worker thread {} exiting - interrupted", i);
317            break;
318        };
319        trace!("Syncing {} in worker thread {}", job.name, i);
320        match measure!(
321            format!("Synced {}", job.name),
322            sync_mailbox(
323                &opts.uid,
324                &opts.local,
325                &job,
326                &direction,
327                &mut imap_session,
328                &abort
329            )
330        ) {
331            Ok(_) => job.success = true,
332            Err(e) => error!("Error syncing {}: {}", job.name, e),
333        }
334        tx.send(job)?;
335    }
336    drop(tx);
337    imap_session.logout()?;
338    Ok(())
339}
340
341fn sync_(
342    opts: &SyncOptions,
343    direction: SyncDirection,
344    abort: Arc<AtomicBool>,
345) -> Result<(), Error> {
346    info!("Syncing from {} to {}", opts.remote, opts.local);
347
348    fs::create_dir_all(&opts.local)?;
349
350    let root_state = state::RootState::load(&opts.local, &opts.uid)?;
351
352    let mut imap_session = new_session(opts)?;
353
354    let names = imap_session.list_status(None, Some("*"), DATA_ITEMS_HMS)?;
355
356    if names.is_empty() {
357        return Err(Error::NoRemoteMailboxes());
358    }
359
360    let includes: Vec<WildMatch> = opts.include.iter().map(|p| WildMatch::new(p)).collect();
361    let excludes: Vec<WildMatch> = opts.exclude.iter().map(|p| WildMatch::new(p)).collect();
362
363    let delimiter = get_hierarchy_delimiter(&names)?;
364    let remote_mailboxes_with_state = state::load_remote_mailboxes(&names);
365    let remote_mailboxes: BTreeSet<&String> = remote_mailboxes_with_state.keys().collect();
366    let local_mailboxes_with_state = measure!(
367        format!("Loaded local mailboxes"),
368        state::load_local_mailboxes(&opts.uid, &opts.local, &delimiter)?
369    );
370    let local_mailboxes: BTreeSet<&String> = local_mailboxes_with_state.keys().collect();
371
372    trace!("local: {:?}", local_mailboxes);
373    trace!("remote: {:?}", remote_mailboxes);
374
375    let only_local: BTreeSet<&String> = local_mailboxes
376        .difference(&remote_mailboxes)
377        .cloned()
378        .collect();
379    trace!("Only local: {:?}", only_local);
380    let only_remote: BTreeSet<&String> = remote_mailboxes
381        .difference(&local_mailboxes)
382        .cloned()
383        .collect();
384    trace!("Only remote: {:?}", only_remote);
385    let sync_jobs: Vec<state::SyncJob> = local_mailboxes
386        .union(&remote_mailboxes)
387        .cloned()
388        .filter_map(|name| {
389            // Apply user-provided include/exclude filter
390            if !includes.is_empty() && !includes.iter().any(|e| e.matches(name)) {
391                trace!("Skipping {} due to include filter", name);
392                return None;
393            }
394            if excludes.iter().any(|e| e.matches(name)) {
395                trace!("Skipping {} due to exclude filter", name);
396                return None;
397            }
398            trace!("Processing mailbox {}", name);
399            let delimiter = delimiter.clone();
400
401            // We'll have to go through the tedious process of determining what to do
402            // with mailboxes that are missing on one side.
403            let name = name.clone();
404            if only_local.contains(&name) {
405                match direction {
406                    SyncDirection::Pull => {
407                        // Delete locally, also from root state
408                        SyncJob::new(name, delimiter, SyncAction::DeleteLocal)
409                    }
410                    SyncDirection::Push => {
411                        // Create remotely, add to root state
412                        SyncJob::new(name, delimiter, SyncAction::CreateRemote)
413                    }
414                    SyncDirection::TwoWay => {
415                        let dirname = state::mailbox_to_dirname(&name, &delimiter);
416                        if root_state
417                            .contains_subdir(&dirname)
418                            .expect("failed to inspect current state")
419                        {
420                            // Exists locally only because removed on server side
421                            // Delete locally, also from root state
422                            SyncJob::new(name, delimiter, SyncAction::DeleteLocal)
423                        } else {
424                            // Exists locally only because created here
425                            // Create remotely, add to root state
426                            SyncJob::new(name, delimiter, SyncAction::CreateRemote)
427                        }
428                    }
429                }
430            } else if only_remote.contains(&name) {
431                match direction {
432                    SyncDirection::Pull => {
433                        // Create locally (already happening), add to root state (after sync?)
434                        SyncJob::new(name, delimiter, SyncAction::CreateLocal)
435                    }
436                    SyncDirection::Push => {
437                        // Delete remotely, also from root state
438                        SyncJob::new(name, delimiter, SyncAction::DeleteRemote)
439                    }
440                    SyncDirection::TwoWay => {
441                        let dirname = state::mailbox_to_dirname(&name, &delimiter);
442                        if root_state
443                            .contains_subdir(&dirname)
444                            .expect("failed to inspect current state")
445                        {
446                            // Exists remotely only because removed locally
447                            // Delete remotely, also from root state
448                            SyncJob::new(name, delimiter, SyncAction::DeleteRemote)
449                        } else {
450                            // Exists remotely only because created there
451                            // Create locally (already happening), add to root state (after sync?)
452                            SyncJob::new(name, delimiter, SyncAction::CreateLocal)
453                        }
454                    }
455                }
456            } else {
457                // Exists on both sides, just sync
458                let state = local_mailboxes_with_state.get(&name).unwrap();
459                let modseq = remote_mailboxes_with_state.get(&name).unwrap();
460                if !state.has_local_changes() && state.last_seen_highest_mod_seq() == *modseq {
461                    trace!("Skipping {} because HIGHESTMODSEQ is in sync", name);
462                    return None;
463                }
464                SyncJob::new(name, delimiter, state::SyncAction::Sync)
465            }
466        })
467        .collect();
468
469    if opts.list_mailbox_actions {
470        info!("The following actions would be performed:");
471        for job in sync_jobs {
472            info!("  {}: {:?}", job.name, job.action);
473        }
474        return Ok(());
475    }
476
477    let bail = sync_jobs.iter().any(|job| match job.action {
478        state::SyncAction::DeleteLocal => {
479            warn!("About to delete local mailbox {}", job.name);
480            true
481        }
482        state::SyncAction::DeleteRemote => {
483            warn!("About to delete remote mailbox {}", job.name);
484            true
485        }
486        _ => false,
487    });
488    if bail && !opts.force {
489        return Err(Error::DangerousAction());
490    }
491
492    let thread_count = if sync_jobs.len() < opts.threads.into() {
493        // Conversion must be safe here
494        sync_jobs.len().try_into().unwrap()
495    } else {
496        opts.threads
497    };
498
499    let (mut tx_work, rx_work) = spmc::channel::<state::SyncJob>();
500    let (tx_result, rx_result) = mpsc::channel::<state::SyncJob>();
501
502    info!(
503        "Using {} threads to sync {} mailboxes",
504        thread_count,
505        sync_jobs.len()
506    );
507
508    let mut sess = iter::once(imap_session);
509
510    let threads: Vec<_> = (0..thread_count)
511        .map(|i| {
512            let rx = rx_work.clone();
513            let tx = tx_result.clone();
514            let opts = (*opts).clone();
515            let dir = direction.clone();
516            let session = sess.next();
517            let abort = Arc::clone(&abort);
518
519            thread::spawn(move || {
520                if let Err(e) = worker_thread(i, &opts, dir, rx, tx, session, abort) {
521                    error!("Error in worker thread {}: {}", i, e);
522                };
523            })
524        })
525        .collect();
526
527    let expected_results = sync_jobs.len();
528    let mut received_results = 0usize;
529
530    for job in sync_jobs {
531        tx_work.send(job)?;
532    }
533
534    drop(tx_work);
535    drop(tx_result);
536
537    let mut success = true;
538
539    while let Ok(job) = rx_result.recv() {
540        // update root state if needed
541        trace!("root state update: {:?} {}", job.action, job.name);
542        success &= job.success;
543        received_results += 1;
544
545        root_state.sync_done(job)?;
546    }
547
548    for t in threads {
549        t.join().unwrap();
550    }
551
552    if success && (expected_results == received_results) {
553        info!("Sync successful");
554        Ok(())
555    } else {
556        error!("Sync failed!");
557        Err(Error::SyncIncomplete())
558    }
559}
560
561/// Apply all changes from IMAP to the local m2dir
562///
563/// This overwrites any changes that may have happened on the local side.
564///
565/// <div class="warning">
566///
567/// Synchronizing local and remote state as quickly as possible requires an
568/// additional index, which itself has to be kept in sync with the actual
569/// storage. Doing this all atomically is next to impossible. Any caller of this
570/// function **should catch termination signals!** Setting the inner value of
571/// the `abort` argument to `true` will make the function return an error as
572/// early as possible, while still making sure the persisted state is
573/// consistent.
574///
575/// </div>
576///
577/// # Arguments
578///
579/// * `opts`: options controlling the synchronization process
580/// * `abort`: if the inner value becomes true at any time, the sync will abort
581///   as soon as possible without corrupting the index
582pub fn pull(opts: &SyncOptions, abort: Arc<AtomicBool>) -> Result<(), Error> {
583    measure!(
584        format!("Pulled from {}", opts.remote),
585        sync_(opts, SyncDirection::Pull, abort)
586    )
587}
588
589/// Apply all changes in local m2dir to IMAP
590///
591/// This overwrites any changes that may have happened on the IMAP side.
592///
593/// <div class="warning">
594///
595/// Synchronizing local and remote state as quickly as possible requires an
596/// additional index, which itself has to be kept in sync with the actual
597/// storage. Doing this all atomically is next to impossible. Any caller of this
598/// function **should catch termination signals!** Setting the inner value of
599/// the `abort` argument to `true` will make the function return an error as
600/// early as possible, while still making sure the persisted state is
601/// consistent.
602///
603/// </div>
604///
605/// # Arguments
606///
607/// * `opts`: options controlling the synchronization process
608/// * `abort`: if the inner value becomes true at any time, the sync will abort
609///   as soon as possible without corrupting the index
610pub fn push(opts: &SyncOptions, abort: Arc<AtomicBool>) -> Result<(), Error> {
611    measure!(
612        format!("Pushed to {}", opts.remote),
613        sync_(opts, SyncDirection::Push, abort)
614    )
615}
616
617/// Synchronize local and remote changes between IMAP and local m2dir
618///
619/// This includes fetching new mail, deleting expunged mails, updating tags,
620/// etc.
621///
622/// <div class="warning">
623///
624/// Synchronizing local and remote state as quickly as possible requires an
625/// additional index, which itself has to be kept in sync with the actual
626/// storage. Doing this all atomically is next to impossible. Any caller of this
627/// function **should catch termination signals!** Setting the inner value of
628/// the `abort` argument to `true` will make the function return an error as
629/// early as possible, while still making sure the persisted state is
630/// consistent.
631///
632/// </div>
633///
634/// # Arguments
635///
636/// * `opts`: options controlling the synchronization process
637/// * `abort`: if the inner value becomes true at any time, the sync will abort
638///   as soon as possible without corrupting the index
639pub fn sync(opts: &SyncOptions, abort: Arc<AtomicBool>) -> Result<(), Error> {
640    measure!(
641        format!("Synced with {}", opts.remote),
642        sync_(opts, SyncDirection::TwoWay, abort)
643    )
644}