1use imap::extensions::list_status::ExtendedNames;
23use imap::ClientBuilder;
24use imap::ImapConnection;
25use imap::Session;
26use log::warn;
27use log::{debug, error, info, trace};
28use std::collections::BTreeSet;
29use std::fs;
30use std::io;
31use std::iter;
32use std::iter::Iterator;
33use std::path::PathBuf;
34use std::sync::atomic::AtomicBool;
35use std::sync::atomic::Ordering;
36use std::sync::mpsc;
37use std::sync::Arc;
38use std::thread;
39use std::time::Instant;
40use wildmatch::WildMatch;
41
42use crate::state::SyncAction;
43use crate::state::SyncJob;
44
45mod flags;
46mod mailboxes;
47mod seqset;
48mod state;
49mod sync;
50
51#[derive(thiserror::Error, Debug)]
52pub enum Error {
53 #[error("IMAP error: {0}")]
54 ProtocolError(#[from] imap::Error),
55 #[error("server is incompatible: no UIDVALIDITY provided")]
56 ServerIncompatible(),
57 #[error("APPEND did not return any UIDs")]
58 InvalidAppendResponse(),
59 #[error("server did not send UID, giving up")]
60 MissingUID(),
61 #[error("downloaded message has no body")]
62 MissingBody(),
63 #[error("invalid configuration: {0}")]
64 ConfigError(&'static str),
65 #[error("refused to perform dangerous action - set 'force' option to override")]
66 DangerousAction(),
67 #[error("server does not support {0}")]
68 MissingCapability(&'static str),
69 #[error("found multiple hierarchy delimiters")]
70 MultipleHierarchyDelimiters(),
71 #[error("no remote mailboxes found")]
72 NoRemoteMailboxes(),
73 #[error("UID validity change with pending local changes makes resynchronization impossible")]
74 UIDValidityChangeWithLocalChanges(),
75 #[error("not all mailboxes were synced successfully")]
76 SyncIncomplete(),
77 #[error("failed to load state: {0}")]
78 StateError(#[from] state::Error),
79 #[error("IPC error: {0}")]
80 IPCError(#[from] spmc::SendError<state::SyncJob>),
81 #[error("error accessing maildir: {0}")]
82 IOError(#[from] io::Error),
83 #[error("interrupted")]
84 Interrupted(),
85 }
88
89#[derive(Clone, Debug, PartialEq, Eq)]
90pub enum SyncDirection {
91 Pull,
92 Push,
93 TwoWay,
94}
95
96#[derive(Clone, Debug)]
98pub struct SyncOptions {
99 pub uid: String,
103 pub local: String,
105 pub remote: String,
107 pub user: String,
109 pub password: String,
111 pub threads: u8,
116 pub unsafe_tls: bool,
118 pub disable_tls: bool,
120 pub list_mailbox_actions: bool,
123 pub include: Vec<String>,
127 pub exclude: Vec<String>,
132 pub force: bool,
134}
135
136macro_rules! measure {
137 ( $m:expr, $x:expr ) => {{
138 let start = Instant::now();
139 let result = $x;
140 let duration = start.elapsed();
141 debug!("{} in {}ms", $m, duration.as_millis());
142 result
143 }};
144}
145
146const DATA_ITEMS_HMS: &str = "(HIGHESTMODSEQ)";
147
148fn get_hierarchy_delimiter(names: &ExtendedNames) -> Result<String, Error> {
149 let delims: BTreeSet<&str> = names
150 .iter()
151 .filter_map(|(name, _)| name.delimiter())
152 .collect();
153 if delims.len() != 1 {
154 return Err(Error::MultipleHierarchyDelimiters());
155 }
156 Ok(String::from(delims.into_iter().next().unwrap()))
157}
158
159fn new_session(opts: &SyncOptions) -> Result<Session<Box<dyn ImapConnection>>, Error> {
160 let (host, port) = match opts.remote.rsplit_once(':') {
161 Some((host, port)) => (host, port),
162 None => (opts.remote.as_str(), "993"),
163 };
164
165 let port = match port.parse() {
166 Ok(p) => p,
167 Err(e) => {
168 error!("failed to parse remote port: {}", e);
169 return Err(Error::ConfigError("invalid port"));
170 }
171 };
172
173 debug!("Connecting to {}:{}", host, port);
174
175 let client = if opts.disable_tls {
176 ClientBuilder::new(host, port)
177 .mode(imap::ConnectionMode::Plaintext)
178 .connect()?
179 } else if opts.unsafe_tls {
180 ClientBuilder::new(host, port)
181 .danger_skip_tls_verify(true)
182 .connect()?
183 } else {
184 ClientBuilder::new(host, port).connect()?
185 };
186
187 debug!("Logging in as {}", &opts.user);
188 let mut session = client.login(&opts.user, &opts.password).map_err(|e| e.0)?;
189
190 session.run_command_and_read_response("ENABLE QRESYNC")?;
191
192 Ok(session)
193}
194
195fn check_server_capabilities_(opts: &SyncOptions) -> Result<(), Error> {
196 let mut session = new_session(opts)?;
197 let caps = session.capabilities()?;
198 for cap in caps.iter() {
199 trace!("Server capability: {:?}", cap);
200 }
201 session.logout()?;
202 if !caps.has_str("QRESYNC") {
203 return Err(Error::MissingCapability("QRESYNC (RFC 7162)"));
204 }
205 if !caps.has_str("UIDPLUS") {
206 return Err(Error::MissingCapability("UIDPLUS (RFC 4315)"));
207 }
208 if !caps.has_str("LIST-STATUS") {
209 return Err(Error::MissingCapability("LIST-STATUS (RFC 5819)"));
210 }
211 Ok(())
212}
213
214pub fn list_mailboxes(opts: &SyncOptions) -> Result<(), Error> {
219 let mut session = new_session(opts)?;
220 let names = session.list_status(None, Some("*"), DATA_ITEMS_HMS)?;
221 if names.is_empty() {
222 return Err(Error::NoRemoteMailboxes());
223 }
224
225 let delimiter = get_hierarchy_delimiter(&names)?;
226 let remote_mailboxes = state::load_remote_mailboxes(&names);
227 let local_mailboxes = state::load_local_mailboxes(&opts.uid, &opts.local, &delimiter)?;
228
229 info!("Hierarchy delimiter: {}", delimiter);
230 info!("Remote mailboxes:");
231 for m in remote_mailboxes.keys() {
232 info!(" {}", m);
233 }
234 info!("Local mailboxes:");
235 for m in local_mailboxes.keys() {
236 info!(" {}", m);
237 }
238 Ok(())
239}
240
241pub fn check_server_capabilities(opts: &SyncOptions) -> Result<(), Error> {
247 measure!(
248 format!("Checked capabilities for {}", opts.remote),
249 check_server_capabilities_(opts)
250 )
251}
252
253fn sync_mailbox<T: io::Read + io::Write>(
254 state_id: &str,
255 maildir_root: &str,
256 sync_job: &state::SyncJob,
257 direction: &SyncDirection,
258 session: &mut Session<T>,
259 abort: &Arc<AtomicBool>,
260) -> Result<(), Error> {
261 let mailbox = &sync_job.name;
262 let dirname = state::mailbox_to_dirname(mailbox, &sync_job.delimiter);
263 let mbpath: PathBuf = [maildir_root, &dirname].iter().collect();
264
265 match sync_job.action {
266 state::SyncAction::Sync => (),
267 state::SyncAction::CreateLocal => {
268 trace!("Creating local mailbox {}", mailbox);
269 }
271 state::SyncAction::CreateRemote => {
272 trace!("Creating remote mailbox {}", mailbox);
273 session.create(mailbox)?
274 }
275 state::SyncAction::DeleteLocal => {
276 trace!("Deleting local mailbox {}", mailbox);
277 return fs::remove_dir_all(&mbpath).map_err(Error::IOError);
278 }
279 state::SyncAction::DeleteRemote => {
280 trace!("Deleting remote mailbox {}", mailbox);
281 return session.delete(mailbox).map_err(Error::ProtocolError);
282 }
283 };
284
285 let state = measure!(
286 format!("Loaded state for {}", mailbox),
287 state::SyncState::load(&mbpath, state_id)?
288 );
289
290 let remote = session.select(mailbox)?;
291
292 match direction {
293 SyncDirection::Pull => sync::pull(session, sync_job, remote, state, abort),
294 SyncDirection::Push => sync::push(session, sync_job, remote, state, abort),
295 SyncDirection::TwoWay => sync::sync(session, sync_job, remote, state, abort),
296 }
297}
298
299fn worker_thread(
300 i: u8,
301 opts: &SyncOptions,
302 direction: SyncDirection,
303 rx: spmc::Receiver<state::SyncJob>,
304 tx: mpsc::Sender<state::SyncJob>,
305 session: Option<Session<Box<dyn ImapConnection>>>,
306 abort: Arc<AtomicBool>,
307) -> Result<(), Error> {
308 let mut imap_session = match session {
309 Some(s) => s,
310 None => new_session(opts)?,
311 };
312
313 while let Ok(mut job) = rx.recv() {
314 if abort.load(Ordering::Relaxed) {
315 warn!("Worker thread {} exiting - interrupted", i);
316 break;
317 };
318 trace!("Syncing {} in worker thread {}", job.name, i);
319 match measure!(
320 format!("Synced {}", job.name),
321 sync_mailbox(
322 &opts.uid,
323 &opts.local,
324 &job,
325 &direction,
326 &mut imap_session,
327 &abort
328 )
329 ) {
330 Ok(_) => job.success = true,
331 Err(e) => error!("Error syncing {}: {}", job.name, e),
332 }
333 tx.send(job)?;
334 }
335 drop(tx);
336 imap_session.logout()?;
337 Ok(())
338}
339
340fn sync_(
341 opts: &SyncOptions,
342 direction: SyncDirection,
343 abort: Arc<AtomicBool>,
344) -> Result<(), Error> {
345 info!("Syncing from {} to {}", opts.remote, opts.local);
346
347 fs::create_dir_all(&opts.local)?;
348
349 let root_state = state::RootState::load(&opts.local, &opts.uid)?;
350
351 let mut imap_session = new_session(opts)?;
352
353 let names = imap_session.list_status(None, Some("*"), DATA_ITEMS_HMS)?;
354
355 if names.is_empty() {
356 return Err(Error::NoRemoteMailboxes());
357 }
358
359 let includes: Vec<WildMatch> = opts.include.iter().map(|p| WildMatch::new(p)).collect();
360 let excludes: Vec<WildMatch> = opts.exclude.iter().map(|p| WildMatch::new(p)).collect();
361
362 let delimiter = get_hierarchy_delimiter(&names)?;
363 let remote_mailboxes_with_state = state::load_remote_mailboxes(&names);
364 let remote_mailboxes: BTreeSet<&String> = remote_mailboxes_with_state.keys().collect();
365 let local_mailboxes_with_state = measure!(
366 format!("Loaded local mailboxes"),
367 state::load_local_mailboxes(&opts.uid, &opts.local, &delimiter)?
368 );
369 let local_mailboxes: BTreeSet<&String> = local_mailboxes_with_state.keys().collect();
370
371 trace!("local: {:?}", local_mailboxes);
372 trace!("remote: {:?}", remote_mailboxes);
373
374 let only_local: BTreeSet<&String> = local_mailboxes
375 .difference(&remote_mailboxes)
376 .cloned()
377 .collect();
378 trace!("Only local: {:?}", only_local);
379 let only_remote: BTreeSet<&String> = remote_mailboxes
380 .difference(&local_mailboxes)
381 .cloned()
382 .collect();
383 trace!("Only remote: {:?}", only_remote);
384 let sync_jobs: Vec<state::SyncJob> = local_mailboxes
385 .union(&remote_mailboxes)
386 .cloned()
387 .filter_map(|name| {
388 if !includes.is_empty() && !includes.iter().any(|e| e.matches(name)) {
390 trace!("Skipping {} due to include filter", name);
391 return None;
392 }
393 if excludes.iter().any(|e| e.matches(name)) {
394 trace!("Skipping {} due to exclude filter", name);
395 return None;
396 }
397 trace!("Processing mailbox {}", name);
398 let delimiter = delimiter.clone();
399
400 let name = name.clone();
403 if only_local.contains(&name) {
404 match direction {
405 SyncDirection::Pull => {
406 SyncJob::new(name, delimiter, SyncAction::DeleteLocal)
408 }
409 SyncDirection::Push => {
410 SyncJob::new(name, delimiter, SyncAction::CreateRemote)
412 }
413 SyncDirection::TwoWay => {
414 let dirname = state::mailbox_to_dirname(&name, &delimiter);
415 if root_state
416 .contains_subdir(&dirname)
417 .expect("failed to inspect current state")
418 {
419 SyncJob::new(name, delimiter, SyncAction::DeleteLocal)
422 } else {
423 SyncJob::new(name, delimiter, SyncAction::CreateRemote)
426 }
427 }
428 }
429 } else if only_remote.contains(&name) {
430 match direction {
431 SyncDirection::Pull => {
432 SyncJob::new(name, delimiter, SyncAction::CreateLocal)
434 }
435 SyncDirection::Push => {
436 SyncJob::new(name, delimiter, SyncAction::DeleteRemote)
438 }
439 SyncDirection::TwoWay => {
440 let dirname = state::mailbox_to_dirname(&name, &delimiter);
441 if root_state
442 .contains_subdir(&dirname)
443 .expect("failed to inspect current state")
444 {
445 SyncJob::new(name, delimiter, SyncAction::DeleteRemote)
448 } else {
449 SyncJob::new(name, delimiter, SyncAction::CreateLocal)
452 }
453 }
454 }
455 } else {
456 let state = local_mailboxes_with_state.get(&name).unwrap();
458 let modseq = remote_mailboxes_with_state.get(&name).unwrap();
459 if !state.has_local_changes() && state.last_seen_highest_mod_seq() == *modseq {
460 trace!("Skipping {} because HIGHESTMODSEQ is in sync", name);
461 return None;
462 }
463 SyncJob::new(name, delimiter, state::SyncAction::Sync)
464 }
465 })
466 .collect();
467
468 if opts.list_mailbox_actions {
469 info!("The following actions would be performed:");
470 for job in sync_jobs {
471 info!(" {}: {:?}", job.name, job.action);
472 }
473 return Ok(());
474 }
475
476 let bail = sync_jobs
477 .iter()
478 .map(|job| match job.action {
479 state::SyncAction::DeleteLocal => {
480 warn!("About to delete local mailbox {}", job.name);
481 true
482 }
483 state::SyncAction::DeleteRemote => {
484 warn!("About to delete remote mailbox {}", job.name);
485 true
486 }
487 _ => false,
488 })
489 .any(|b| b);
490 if bail && !opts.force {
491 return Err(Error::DangerousAction());
492 }
493
494 let thread_count = if sync_jobs.len() < opts.threads.into() {
495 sync_jobs.len().try_into().unwrap()
497 } else {
498 opts.threads
499 };
500
501 let (mut tx_work, rx_work) = spmc::channel::<state::SyncJob>();
502 let (tx_result, rx_result) = mpsc::channel::<state::SyncJob>();
503
504 info!(
505 "Using {} threads to sync {} mailboxes",
506 thread_count,
507 sync_jobs.len()
508 );
509
510 let mut sess = iter::once(imap_session);
511
512 let threads: Vec<_> = (0..thread_count)
513 .map(|i| {
514 let rx = rx_work.clone();
515 let tx = tx_result.clone();
516 let opts = (*opts).clone();
517 let dir = direction.clone();
518 let session = sess.next();
519 let abort = Arc::clone(&abort);
520
521 thread::spawn(move || {
522 if let Err(e) = worker_thread(i, &opts, dir, rx, tx, session, abort) {
523 error!("Error in worker thread {}: {}", i, e);
524 };
525 })
526 })
527 .collect();
528
529 let expected_results = sync_jobs.len();
530 let mut received_results = 0usize;
531
532 for job in sync_jobs {
533 tx_work.send(job)?;
534 }
535
536 drop(tx_work);
537 drop(tx_result);
538
539 let mut success = true;
540
541 while let Ok(job) = rx_result.recv() {
542 trace!("root state update: {:?} {}", job.action, job.name);
544 success &= job.success;
545 received_results += 1;
546
547 root_state.sync_done(job)?;
548 }
549
550 for t in threads {
551 t.join().unwrap();
552 }
553
554 if success && (expected_results == received_results) {
555 info!("Sync successful");
556 Ok(())
557 } else {
558 error!("Sync failed!");
559 Err(Error::SyncIncomplete())
560 }
561}
562
563pub fn pull(opts: &SyncOptions, abort: Arc<AtomicBool>) -> Result<(), Error> {
585 measure!(
586 format!("Pulled from {}", opts.remote),
587 sync_(opts, SyncDirection::Pull, abort)
588 )
589}
590
591pub fn push(opts: &SyncOptions, abort: Arc<AtomicBool>) -> Result<(), Error> {
613 measure!(
614 format!("Pushed to {}", opts.remote),
615 sync_(opts, SyncDirection::Push, abort)
616 )
617}
618
619pub fn sync(opts: &SyncOptions, abort: Arc<AtomicBool>) -> Result<(), Error> {
642 measure!(
643 format!("Synced with {}", opts.remote),
644 sync_(opts, SyncDirection::TwoWay, abort)
645 )
646}