vomit_sync/
lib.rs

1//! # vomit-sync
2//!
3//! `vomit-sync` provides full two-way synchronization between IMAP and
4//! a local maildir. At the moment, it has to be called periodically to
5//! keep the two synchronized.
6//!
7//! It uses the [log][1] crate for logging, so you can receive logs from it by
8//! using any of the compatible logging libraries.
9//!
10//! [1]: https://crates.io/crates/log
11//!
12//! [vsync][2] is small CLI wrapper around `vomit-sync`.
13//!
14//! [2]: https://git.sr.ht/~bitfehler/vomit-sync/tree/master/item/cli/
15//!
16//! As the name implies, `vomit-sync` is part of the [vomit project][3].
17//!
18//! [3]: https://sr.ht/~bitfehler/vomit
19
20use imap::extensions::list_status::ExtendedNames;
21use imap::ClientBuilder;
22use imap::Session;
23use log::{debug, error, info, trace};
24use maildir::Maildir;
25use std::collections::BTreeSet;
26use std::fs;
27use std::io;
28use std::iter::Iterator;
29use std::path::PathBuf;
30use std::sync::mpsc;
31use std::thread;
32use std::time::Instant;
33use thiserror::Error;
34use vomit::{Mailbox, VomitError};
35use wildmatch::WildMatch;
36
37mod flags;
38mod mailboxes;
39mod seqset;
40mod state;
41mod sync;
42
43#[derive(Error, Debug)]
44pub enum SyncError {
45    #[error("IMAP error: {0}")]
46    ProtocolError(#[from] imap::Error),
47    #[error("invalid configuration: {0}")]
48    ConfigError(&'static str),
49    #[error("Refused to perform dangerous action - set 'force' option to override")]
50    DangerousActionError(),
51    #[error("failed to load state: {0}")]
52    StateError(#[from] state::StateError),
53    #[error("IPC error: {0}")]
54    IPCError(#[from] spmc::SendError<mailboxes::SyncJob>),
55    #[error("error accessing maildir: {0}")]
56    IOError(#[from] io::Error),
57    #[error("error managing maildir: {0}")]
58    MaildirError(#[from] maildir::MaildirError),
59    #[error("{0}")]
60    MailboxesError(#[from] mailboxes::MailboxesError),
61    #[error("error in maildir abstraction library: {0}")]
62    VomitError(#[from] VomitError),
63    #[error("{0}")]
64    Error(&'static str),
65    #[error("{0}")]
66    E(String),
67}
68
69#[derive(Clone, Debug, PartialEq, Eq)]
70pub enum SyncDirection {
71    Pull,
72    Push,
73    TwoWay,
74}
75
76/// A set of options to control the synchronization process
77#[derive(Clone, Debug)]
78pub struct SyncOptions {
79    /// The local maildir to sync to
80    pub local: String,
81    /// The IMAPS URL to sync from
82    pub remote: String,
83    /// The user for IMAP authentication
84    pub user: String,
85    /// The password for IMAP authentication
86    pub password: String,
87    /// The number of threads to use
88    ///
89    /// Each thread will use it's own IMAP session, so this must not be greater
90    /// than the number of concurrent user sessions allowed by the IMAP server.
91    pub threads: u8,
92    /// Disable TLS certificate checks (e.g. for self-signed certs) (insecure)
93    pub unsafe_tls: bool,
94    /// Completely disable TLS (very insecure)
95    pub disable_tls: bool,
96    /// Only log with info level what actions (sync, create, delete) would be taken on
97    /// which mailboxes (taking include/exclude into account), then exit.
98    pub list_mailbox_actions: bool,
99    /// A list of wildcard patterns to include only folders that match any of them.
100    ///
101    /// The wildcard pattern only supports `?` and `*` and must match the full mailbox name.
102    pub include: Vec<String>,
103    /// A list of wildcard patterns to exclude all folders that match any of them.
104    ///
105    /// The wildcard pattern only supports `?` and `*` and must match the full mailbox name.
106    /// If used together with `include` and both match, `exclude` takes precedence.
107    pub exclude: Vec<String>,
108    /// Confirm execution of potentially dangerous actions (e.g. deleting mailboxes)
109    pub force: bool,
110}
111
112macro_rules! measure {
113    ( $m:expr, $x:expr ) => {{
114        let start = Instant::now();
115        let result = $x;
116        let duration = start.elapsed();
117        debug!("{} in {}ms", $m, duration.as_millis());
118        result
119    }};
120}
121
122const DATA_ITEMS_HMS: &str = "(HIGHESTMODSEQ)";
123
124fn get_hierarchy_delimiter(names: &ExtendedNames) -> Result<String, SyncError> {
125    let delims: BTreeSet<&str> = names
126        .iter()
127        .filter_map(|(name, _)| name.delimiter())
128        .collect();
129    if delims.len() != 1 {
130        return Err(SyncError::E(format!(
131            "Expected exactly on hierarchy delimiter, found {:?}",
132            delims
133        )));
134    }
135    Ok(String::from(delims.into_iter().next().unwrap()))
136}
137
138fn new_session(
139    opts: &SyncOptions,
140) -> Result<Session<impl std::io::Read + std::io::Write>, SyncError> {
141    let (host, port) = match opts.remote.rsplit_once(':') {
142        Some((host, port)) => (host, port),
143        None => (opts.remote.as_str(), "993"),
144    };
145
146    let port = match port.parse() {
147        Ok(p) => p,
148        Err(e) => {
149            error!("failed to parse remote port: {}", e);
150            return Err(SyncError::ConfigError("invalid port"));
151        }
152    };
153
154    debug!("Connecting to {}:{}", host, port);
155
156    let client = if opts.disable_tls {
157        ClientBuilder::new(host, port)
158            .mode(imap::ConnectionMode::Plaintext)
159            .connect()?
160    } else if opts.unsafe_tls {
161        ClientBuilder::new(host, port)
162            .danger_skip_tls_verify(true)
163            .connect()?
164    } else {
165        ClientBuilder::new(host, port).connect()?
166    };
167
168    debug!("Logging in as {}", &opts.user);
169    let mut session = client.login(&opts.user, &opts.password).map_err(|e| e.0)?;
170
171    session.run_command_and_read_response("ENABLE QRESYNC")?;
172
173    Ok(session)
174}
175
176fn check_server_capabilities_(opts: &SyncOptions) -> Result<(), SyncError> {
177    let mut session = new_session(opts)?;
178    let caps = session.capabilities()?;
179    for cap in caps.iter() {
180        trace!("Server capability: {:?}", cap);
181    }
182    session.logout()?;
183    if !caps.has_str("QRESYNC") {
184        return Err(SyncError::E(
185            "Server does not support QRESYNC (RFC 7162)".to_string(),
186        ));
187    }
188    if !caps.has_str("UIDPLUS") {
189        return Err(SyncError::E(
190            "Server does not support UIDPLUS (RFC 4315)".to_string(),
191        ));
192    }
193    if !caps.has_str("LIST-STATUS") {
194        return Err(SyncError::E(
195            "Server does not support LIST-STATUS (RFC 5819)".to_string(),
196        ));
197    }
198    Ok(())
199}
200
201/// List remote and local mailboxes with log level "info".
202///
203/// This is intended for human consumption. It can e.g. be presented to the
204/// user as a verification that all mailboxes are found as expected.
205pub fn list_mailboxes(opts: &SyncOptions) -> Result<(), SyncError> {
206    let mut session = new_session(opts)?;
207    let names = session.list_status(None, Some("*"), DATA_ITEMS_HMS)?;
208    if names.is_empty() {
209        return Err(SyncError::Error("No remote mailboxes found"));
210    }
211
212    let delimiter = get_hierarchy_delimiter(&names)?;
213    let remote_mailboxes = mailboxes::load_remote(&names);
214    let local_mailboxes = mailboxes::load_local(&opts.local, &delimiter)?;
215
216    info!("Hierarchy delimiter: {}", delimiter);
217    info!("Remote mailboxes:");
218    for m in remote_mailboxes.keys() {
219        info!("  {}", m);
220    }
221    info!("Local mailboxes:");
222    for m in local_mailboxes.keys() {
223        info!("  {}", m);
224    }
225    Ok(())
226}
227
228/// Check if the configured server supports the required IMAP capabilities
229///
230/// Currently, at least the QRESYNC capability is required ([RFC 7162][1]).
231///
232/// [1]: https://www.rfc-editor.org/rfc/rfc7162.html
233pub fn check_server_capabilities(opts: &SyncOptions) -> Result<(), SyncError> {
234    measure!(
235        format!("Checked capabilities for {}", opts.remote),
236        check_server_capabilities_(opts)
237    )
238}
239
240fn sync_mailbox<T: io::Read + io::Write>(
241    maildir_root: &str,
242    sync_job: &mailboxes::SyncJob,
243    direction: &SyncDirection,
244    session: &mut Session<T>,
245) -> Result<(), SyncError> {
246    let mailbox = &sync_job.name;
247    let dirname = Mailbox::virtual_to_dir(mailbox, &sync_job.delimiter);
248    let mbpath: PathBuf = [maildir_root, &dirname].iter().collect();
249
250    match sync_job.action {
251        mailboxes::SyncAction::Sync => (),
252        mailboxes::SyncAction::CreateLocal => {
253            trace!("Creating local mailbox {}", mailbox);
254            // Local dir creation will happen automatically
255        }
256        mailboxes::SyncAction::CreateRemote => {
257            trace!("Creating remote mailbox {}", mailbox);
258            session.create(mailbox)?
259        }
260        mailboxes::SyncAction::DeleteLocal => {
261            trace!("Deleting local mailbox {}", mailbox);
262            return fs::remove_dir_all(&mbpath).map_err(SyncError::IOError);
263        }
264        mailboxes::SyncAction::DeleteRemote => {
265            trace!("Deleting remote mailbox {}", mailbox);
266            return session.delete(mailbox).map_err(SyncError::ProtocolError);
267        }
268    };
269
270    let maildir = Maildir::from(mbpath);
271    maildir.create_dirs()?;
272
273    let state = measure!(
274        format!("Loaded state for {}", mailbox),
275        state::SyncState::load(&maildir.path())?
276    );
277
278    let remote = session.select(mailbox)?;
279
280    match direction {
281        SyncDirection::Pull => sync::pull(session, sync_job, maildir, remote, state),
282        SyncDirection::Push => sync::push(session, sync_job, maildir, remote, state),
283        SyncDirection::TwoWay => sync::sync(session, sync_job, maildir, remote, state),
284    }
285    .map_err(|e| SyncError::E(format!("{}: {}", mailbox, e)))
286}
287
288fn worker_thread(
289    i: u8,
290    opts: &SyncOptions,
291    direction: SyncDirection,
292    rx: spmc::Receiver<mailboxes::SyncJob>,
293    tx: mpsc::Sender<mailboxes::SyncJob>,
294) -> Result<(), SyncError> {
295    let mut imap_session = new_session(opts)?;
296    while let Ok(job) = rx.recv() {
297        trace!("Syncing {} in worker thread {}", job.name, i);
298        measure!(
299            format!("Synced {}", job.name),
300            sync_mailbox(&opts.local, &job, &direction, &mut imap_session)?
301        );
302        tx.send(job)?;
303    }
304    drop(tx);
305    imap_session.logout()?;
306    Ok(())
307}
308
309fn root_state_thread(
310    root_dir: String,
311    rx_results: mpsc::Receiver<mailboxes::SyncJob>,
312) -> Result<(), SyncError> {
313    trace!("root state thread managing state in {}", root_dir);
314    let mut root_state = state::RootState::load(&root_dir)?;
315    while let Ok(job) = rx_results.recv() {
316        // update root state if needed
317        trace!("root state update: {:?} {}", job.action, job.name);
318        mailboxes::sync_done(job, &mut root_state);
319        root_state.save()?;
320    }
321    Ok(())
322}
323
324fn sync_(opts: &SyncOptions, direction: SyncDirection) -> Result<(), SyncError> {
325    info!("Syncing from {} to {}", opts.remote, opts.local);
326
327    fs::create_dir_all(&opts.local)?;
328
329    let root_state = state::RootState::load(&opts.local)?;
330
331    let mut imap_session = new_session(opts)?;
332
333    let names = imap_session.list_status(None, Some("*"), DATA_ITEMS_HMS)?;
334
335    if names.is_empty() {
336        return Err(SyncError::Error("No remote mailboxes found, giving up"));
337    }
338
339    let includes: Vec<WildMatch> = opts.include.iter().map(|p| WildMatch::new(p)).collect();
340    let excludes: Vec<WildMatch> = opts.exclude.iter().map(|p| WildMatch::new(p)).collect();
341
342    let delimiter = get_hierarchy_delimiter(&names)?;
343    let remote_mailboxes_with_state = mailboxes::load_remote(&names);
344    let remote_mailboxes: BTreeSet<&String> = remote_mailboxes_with_state.keys().collect();
345    let local_mailboxes_with_state = measure!(
346        format!("Loaded local mailboxes"),
347        mailboxes::load_local(&opts.local, &delimiter)?
348    );
349    let local_mailboxes: BTreeSet<&String> = local_mailboxes_with_state.keys().collect();
350
351    trace!("local: {:?}", local_mailboxes);
352    trace!("remote: {:?}", remote_mailboxes);
353
354    let only_local: BTreeSet<&String> = local_mailboxes
355        .difference(&remote_mailboxes)
356        .cloned()
357        .collect();
358    trace!("Only local: {:?}", only_local);
359    let only_remote: BTreeSet<&String> = remote_mailboxes
360        .difference(&local_mailboxes)
361        .cloned()
362        .collect();
363    trace!("Only remote: {:?}", only_remote);
364    let sync_jobs: Vec<mailboxes::SyncJob> = local_mailboxes
365        .union(&remote_mailboxes)
366        .cloned()
367        .filter_map(|name| {
368            // Apply user-provided include/exclude filter
369            if !includes.is_empty() && !includes.iter().any(|e| e.matches(name)) {
370                trace!("Skipping {} due to include filter", name);
371                return None;
372            }
373            if excludes.iter().any(|e| e.matches(name)) {
374                trace!("Skipping {} due to exclude filter", name);
375                return None;
376            }
377            trace!("Processing mailbox {}", name);
378            let delimiter = delimiter.clone();
379
380            // We'll have to go through the tedious process of determining what to do
381            // with mailboxes that are missing on one side.
382            // trace!("Checking what to do with {}", name);
383            let name = name.clone();
384            if only_local.contains(&name) {
385                match direction {
386                    SyncDirection::Pull => {
387                        // Delete locally, also from root state
388                        mailboxes::sync_delete_local(name, delimiter)
389                    }
390                    SyncDirection::Push => {
391                        // Create remotely, add to root state
392                        mailboxes::sync_create_remote(name, delimiter)
393                    }
394                    SyncDirection::TwoWay => {
395                        let dirname = Mailbox::virtual_to_dir(&name, &delimiter);
396                        if root_state.subdirs.contains(&dirname) {
397                            // Exists locally only because removed on server side
398                            // Delete locally, also from root state
399                            mailboxes::sync_delete_local(name, delimiter)
400                        } else {
401                            // Exists locally only because created here
402                            // Create remotely, add to root state
403                            mailboxes::sync_create_remote(name, delimiter)
404                        }
405                    }
406                }
407            } else if only_remote.contains(&name) {
408                match direction {
409                    SyncDirection::Pull => {
410                        // Create locally (already happening), add to root state (after sync?)
411                        mailboxes::sync_create_local(name, delimiter)
412                    }
413                    SyncDirection::Push => {
414                        // Delete remotely, also from root state
415                        mailboxes::sync_delete_remote(name, delimiter)
416                    }
417                    SyncDirection::TwoWay => {
418                        let dirname = Mailbox::virtual_to_dir(&name, &delimiter);
419                        if root_state.subdirs.contains(&dirname) {
420                            // Exists remotely only because removed locally
421                            // Delete remotely, also from root state
422                            mailboxes::sync_delete_remote(name, delimiter)
423                        } else {
424                            // Exists remotely only because created there
425                            // Create locally (already happening), add to root state (after sync?)
426                            mailboxes::sync_create_local(name, delimiter)
427                        }
428                    }
429                }
430            } else {
431                // Exists on both sides, just sync
432                let state = local_mailboxes_with_state.get(&name).unwrap();
433                let modseq = remote_mailboxes_with_state.get(&name).unwrap();
434                if !state.has_local_changes() && state.last_seen.highest_mod_seq == *modseq {
435                    trace!("Skipping {} because HIGHESTMODSEQ is in sync", name);
436                    return None;
437                }
438                mailboxes::sync(name, delimiter)
439            }
440        })
441        .collect();
442
443    if opts.list_mailbox_actions {
444        info!("The following actions would be performed:");
445        for job in sync_jobs {
446            info!("  {}: {:?}", job.name, job.action);
447        }
448        return Ok(());
449    }
450
451    let bail = sync_jobs
452        .iter()
453        .map(|job| match job.action {
454            mailboxes::SyncAction::DeleteLocal => {
455                error!("Refusing to delete local mailbox {}", job.name);
456                true
457            }
458            mailboxes::SyncAction::DeleteRemote => {
459                error!("Refusing to delete remote mailbox {}", job.name);
460                true
461            }
462            _ => false,
463        })
464        .any(|b| b);
465    if bail {
466        return Err(SyncError::DangerousActionError());
467    }
468
469    drop(root_state);
470
471    let thread_count = if sync_jobs.len() < opts.threads.into() {
472        // Conversion must be safe here
473        sync_jobs.len().try_into().unwrap()
474    } else {
475        opts.threads
476    };
477
478    let mut threads = Vec::new();
479    let (mut tx_work, rx_work) = spmc::channel::<mailboxes::SyncJob>();
480    let (tx_result, rx_result) = mpsc::channel::<mailboxes::SyncJob>();
481
482    let root_dir = opts.local.clone();
483    let t = thread::spawn(move || {
484        if let Err(e) = root_state_thread(root_dir, rx_result) {
485            error!("Error in root state thread: {}", e);
486        };
487    });
488    threads.push(t);
489
490    info!(
491        "Using {} threads to sync {} mailboxes",
492        thread_count,
493        sync_jobs.len()
494    );
495    for i in 1..thread_count {
496        let rx = rx_work.clone();
497        let tx = tx_result.clone();
498        let opts = (*opts).clone();
499        let dir = direction.clone();
500
501        let t = thread::spawn(move || {
502            if let Err(e) = worker_thread(i, &opts, dir, rx, tx) {
503                error!("Error in worker thread {}: {}", i, e);
504            };
505        });
506        threads.push(t);
507    }
508    for job in sync_jobs {
509        tx_work.send(job)?;
510    }
511    drop(tx_work);
512
513    while let Ok(job) = rx_work.recv() {
514        debug!("Syncing {} in main thread ({:?})", job.name, job.action);
515        measure!(
516            format!("Synced {}", job.name),
517            sync_mailbox(&opts.local, &job, &direction, &mut imap_session)?
518        );
519        tx_result.send(job)?;
520    }
521    drop(tx_result);
522
523    _ = imap_session.logout();
524
525    for t in threads {
526        t.join().unwrap();
527    }
528
529    info!("Sync successful");
530    Ok(())
531}
532
533/// Apply all changes from IMAP to the local maildir
534///
535/// This overwrites any changes that may have happened on the local side.
536pub fn pull(opts: &SyncOptions) -> Result<(), SyncError> {
537    measure!(
538        format!("Pulled from {}", opts.remote),
539        sync_(opts, SyncDirection::Pull)
540    )
541}
542
543/// Apply all changes in local maildir to IMAP
544///
545/// This overwrites any changes that may have happened on the IMAP side.
546pub fn push(opts: &SyncOptions) -> Result<(), SyncError> {
547    measure!(
548        format!("Pushed to {}", opts.remote),
549        sync_(opts, SyncDirection::Push)
550    )
551}
552
553/// Synchronize local and remote changes between IMAP and local maildir
554///
555/// This includes fetching new mail, deleting expunged mails, updating tags, etc.
556pub fn sync(opts: &SyncOptions) -> Result<(), SyncError> {
557    measure!(
558        format!("Synced with {}", opts.remote),
559        sync_(opts, SyncDirection::TwoWay)
560    )
561}