1use imap::extensions::list_status::ExtendedNames;
23use imap::ClientBuilder;
24use imap::ImapConnection;
25use imap::Session;
26use log::warn;
27use log::{debug, error, info, trace};
28use std::collections::BTreeSet;
29use std::fs;
30use std::io;
31use std::iter;
32use std::iter::Iterator;
33use std::path::Path;
34use std::path::PathBuf;
35use std::sync::atomic::AtomicBool;
36use std::sync::atomic::Ordering;
37use std::sync::mpsc;
38use std::sync::Arc;
39use std::thread;
40use std::time::Instant;
41use wildmatch::WildMatch;
42
43use crate::state::SyncAction;
44use crate::state::SyncJob;
45
46mod flags;
47mod mailboxes;
48mod seqset;
49mod state;
50mod sync;
51
52#[derive(thiserror::Error, Debug)]
53pub enum Error {
54 #[error("IMAP error: {0}")]
55 IMAP(#[from] imap::Error),
56 #[error("no UIDVALIDITY provided")]
57 MissingUIDValidity(),
58 #[error("no HIGHESTMODSEQ provided")]
59 MissingHMS(),
60 #[error("APPEND did not return any UIDs")]
61 InvalidAppendResponse(),
62 #[error("server did not send UID, giving up")]
63 MissingUID(),
64 #[error("downloaded message has no body")]
65 MissingBody(),
66 #[error("invalid configuration: {0}")]
67 Config(&'static str),
68 #[error("refused to perform dangerous action - set 'force' option to override")]
69 DangerousAction(),
70 #[error("server does not support {0}")]
71 MissingCapability(&'static str),
72 #[error("found multiple hierarchy delimiters")]
73 MultipleHierarchyDelimiters(),
74 #[error("no remote mailboxes found")]
75 NoRemoteMailboxes(),
76 #[error("UID validity change with pending local changes makes resynchronization impossible")]
77 UIDValidityChangeWithLocalChanges(),
78 #[error("not all mailboxes were synced successfully")]
79 SyncIncomplete(),
80 #[error("failed to load state: {0}")]
81 State(#[from] state::Error),
82 #[error("IPC error: {0}")]
83 IPC(#[from] spmc::SendError<state::SyncJob>),
84 #[error("IO error: {0}")]
85 IO(#[from] io::Error),
86 #[error("interrupted")]
87 Interrupted(),
88}
89
90#[derive(Clone, Debug, PartialEq, Eq)]
91pub enum SyncDirection {
92 Pull,
93 Push,
94 TwoWay,
95}
96
97#[derive(Clone, Debug)]
99pub struct SyncOptions {
100 pub uid: String,
104 pub local: String,
106 pub remote: String,
108 pub user: String,
110 pub password: String,
112 pub threads: u8,
117 pub unsafe_tls: bool,
119 pub disable_tls: bool,
121 pub list_mailbox_actions: bool,
124 pub include: Vec<String>,
128 pub exclude: Vec<String>,
133 pub force: bool,
135}
136
137macro_rules! measure {
138 ( $m:expr, $x:expr ) => {{
139 let start = Instant::now();
140 let result = $x;
141 let duration = start.elapsed();
142 debug!("{} in {}ms", $m, duration.as_millis());
143 result
144 }};
145}
146
147const DATA_ITEMS_HMS: &str = "(HIGHESTMODSEQ)";
148
149fn get_hierarchy_delimiter(names: &ExtendedNames) -> Result<String, Error> {
150 let delims: BTreeSet<&str> = names
151 .iter()
152 .filter_map(|(name, _)| name.delimiter())
153 .collect();
154 if delims.len() != 1 {
155 return Err(Error::MultipleHierarchyDelimiters());
156 }
157 Ok(String::from(delims.into_iter().next().unwrap()))
158}
159
160fn new_session(opts: &SyncOptions) -> Result<Session<Box<dyn ImapConnection>>, Error> {
161 let (host, port) = match opts.remote.rsplit_once(':') {
162 Some((host, port)) => (host, port),
163 None => (opts.remote.as_str(), "993"),
164 };
165
166 let port = match port.parse() {
167 Ok(p) => p,
168 Err(e) => {
169 error!("failed to parse remote port: {}", e);
170 return Err(Error::Config("invalid port"));
171 }
172 };
173
174 debug!("Connecting to {}:{}", host, port);
175
176 let client = if opts.disable_tls {
177 ClientBuilder::new(host, port)
178 .mode(imap::ConnectionMode::Plaintext)
179 .connect()?
180 } else if opts.unsafe_tls {
181 ClientBuilder::new(host, port)
182 .danger_skip_tls_verify(true)
183 .connect()?
184 } else {
185 ClientBuilder::new(host, port).connect()?
186 };
187
188 debug!("Logging in as {}", &opts.user);
189 let mut session = client.login(&opts.user, &opts.password).map_err(|e| e.0)?;
190
191 session.run_command_and_read_response("ENABLE QRESYNC")?;
192
193 Ok(session)
194}
195
196fn check_server_capabilities_(opts: &SyncOptions) -> Result<(), Error> {
197 let mut session = new_session(opts)?;
198 let caps = session.capabilities()?;
199 for cap in caps.iter() {
200 trace!("Server capability: {:?}", cap);
201 }
202 session.logout()?;
203 if !caps.has_str("QRESYNC") {
204 return Err(Error::MissingCapability("QRESYNC (RFC 7162)"));
205 }
206 if !caps.has_str("UIDPLUS") {
207 return Err(Error::MissingCapability("UIDPLUS (RFC 4315)"));
208 }
209 if !caps.has_str("LIST-STATUS") {
210 return Err(Error::MissingCapability("LIST-STATUS (RFC 5819)"));
211 }
212 Ok(())
213}
214
215pub fn list_mailboxes(opts: &SyncOptions) -> Result<(), Error> {
220 let mut session = new_session(opts)?;
221 let names = session.list_status(None, Some("*"), DATA_ITEMS_HMS)?;
222 if names.is_empty() {
223 return Err(Error::NoRemoteMailboxes());
224 }
225
226 let delimiter = get_hierarchy_delimiter(&names)?;
227 let remote_mailboxes = state::load_remote_mailboxes(&names);
228 let local_mailboxes = state::load_local_mailboxes(&opts.uid, &opts.local, &delimiter)?;
229
230 info!("Hierarchy delimiter: {}", delimiter);
231 info!("Remote mailboxes:");
232 for m in remote_mailboxes.keys() {
233 info!(" {}", m);
234 }
235 info!("Local mailboxes:");
236 for m in local_mailboxes.keys() {
237 info!(" {}", m);
238 }
239 Ok(())
240}
241
242pub fn check_server_capabilities(opts: &SyncOptions) -> Result<(), Error> {
248 measure!(
249 format!("Checked capabilities for {}", opts.remote),
250 check_server_capabilities_(opts)
251 )
252}
253
254fn sync_mailbox<T: io::Read + io::Write>(
255 state_id: &str,
256 m2store: impl AsRef<Path>,
257 sync_job: &state::SyncJob,
258 direction: &SyncDirection,
259 session: &mut Session<T>,
260 abort: &Arc<AtomicBool>,
261) -> Result<(), Error> {
262 let mailbox = &sync_job.name;
263 let dirname = state::mailbox_to_dirname(mailbox, &sync_job.delimiter);
264 let mbpath: PathBuf = [m2store.as_ref(), dirname.as_ref()].iter().collect();
265
266 match sync_job.action {
267 state::SyncAction::Sync => (),
268 state::SyncAction::CreateLocal => {
269 trace!("Creating local mailbox {}", mailbox);
270 }
272 state::SyncAction::CreateRemote => {
273 trace!("Creating remote mailbox {}", mailbox);
274 session.create(mailbox)?
275 }
276 state::SyncAction::DeleteLocal => {
277 trace!("Deleting local mailbox {}", mailbox);
278 return fs::remove_dir_all(&mbpath).map_err(Error::IO);
279 }
280 state::SyncAction::DeleteRemote => {
281 trace!("Deleting remote mailbox {}", mailbox);
282 return session.delete(mailbox).map_err(Error::IMAP);
283 }
284 };
285
286 let state = measure!(
287 format!("Loaded state for {}", mailbox),
288 state::SyncState::load(&mbpath, state_id)?
289 );
290
291 let remote = session.select(mailbox)?;
292
293 match direction {
294 SyncDirection::Pull => sync::pull(session, sync_job, remote, state, abort),
295 SyncDirection::Push => sync::push(session, sync_job, remote, state, abort),
296 SyncDirection::TwoWay => sync::sync(session, sync_job, remote, state, abort),
297 }
298}
299
300fn worker_thread(
301 i: u8,
302 opts: &SyncOptions,
303 direction: SyncDirection,
304 rx: spmc::Receiver<state::SyncJob>,
305 tx: mpsc::Sender<state::SyncJob>,
306 session: Option<Session<Box<dyn ImapConnection>>>,
307 abort: Arc<AtomicBool>,
308) -> Result<(), Error> {
309 let mut imap_session = match session {
310 Some(s) => s,
311 None => new_session(opts)?,
312 };
313
314 while let Ok(mut job) = rx.recv() {
315 if abort.load(Ordering::Relaxed) {
316 warn!("Worker thread {} exiting - interrupted", i);
317 break;
318 };
319 trace!("Syncing {} in worker thread {}", job.name, i);
320 match measure!(
321 format!("Synced {}", job.name),
322 sync_mailbox(
323 &opts.uid,
324 &opts.local,
325 &job,
326 &direction,
327 &mut imap_session,
328 &abort
329 )
330 ) {
331 Ok(_) => job.success = true,
332 Err(e) => error!("Error syncing {}: {}", job.name, e),
333 }
334 tx.send(job)?;
335 }
336 drop(tx);
337 imap_session.logout()?;
338 Ok(())
339}
340
341fn sync_(
342 opts: &SyncOptions,
343 direction: SyncDirection,
344 abort: Arc<AtomicBool>,
345) -> Result<(), Error> {
346 info!("Syncing from {} to {}", opts.remote, opts.local);
347
348 fs::create_dir_all(&opts.local)?;
349
350 let root_state = state::RootState::load(&opts.local, &opts.uid)?;
351
352 let mut imap_session = new_session(opts)?;
353
354 let names = imap_session.list_status(None, Some("*"), DATA_ITEMS_HMS)?;
355
356 if names.is_empty() {
357 return Err(Error::NoRemoteMailboxes());
358 }
359
360 let includes: Vec<WildMatch> = opts.include.iter().map(|p| WildMatch::new(p)).collect();
361 let excludes: Vec<WildMatch> = opts.exclude.iter().map(|p| WildMatch::new(p)).collect();
362
363 let delimiter = get_hierarchy_delimiter(&names)?;
364 let remote_mailboxes_with_state = state::load_remote_mailboxes(&names);
365 let remote_mailboxes: BTreeSet<&String> = remote_mailboxes_with_state.keys().collect();
366 let local_mailboxes_with_state = measure!(
367 format!("Loaded local mailboxes"),
368 state::load_local_mailboxes(&opts.uid, &opts.local, &delimiter)?
369 );
370 let local_mailboxes: BTreeSet<&String> = local_mailboxes_with_state.keys().collect();
371
372 trace!("local: {:?}", local_mailboxes);
373 trace!("remote: {:?}", remote_mailboxes);
374
375 let only_local: BTreeSet<&String> = local_mailboxes
376 .difference(&remote_mailboxes)
377 .cloned()
378 .collect();
379 trace!("Only local: {:?}", only_local);
380 let only_remote: BTreeSet<&String> = remote_mailboxes
381 .difference(&local_mailboxes)
382 .cloned()
383 .collect();
384 trace!("Only remote: {:?}", only_remote);
385 let sync_jobs: Vec<state::SyncJob> = local_mailboxes
386 .union(&remote_mailboxes)
387 .cloned()
388 .filter_map(|name| {
389 if !includes.is_empty() && !includes.iter().any(|e| e.matches(name)) {
391 trace!("Skipping {} due to include filter", name);
392 return None;
393 }
394 if excludes.iter().any(|e| e.matches(name)) {
395 trace!("Skipping {} due to exclude filter", name);
396 return None;
397 }
398 trace!("Processing mailbox {}", name);
399 let delimiter = delimiter.clone();
400
401 let name = name.clone();
404 if only_local.contains(&name) {
405 match direction {
406 SyncDirection::Pull => {
407 SyncJob::new(name, delimiter, SyncAction::DeleteLocal)
409 }
410 SyncDirection::Push => {
411 SyncJob::new(name, delimiter, SyncAction::CreateRemote)
413 }
414 SyncDirection::TwoWay => {
415 let dirname = state::mailbox_to_dirname(&name, &delimiter);
416 if root_state
417 .contains_subdir(&dirname)
418 .expect("failed to inspect current state")
419 {
420 SyncJob::new(name, delimiter, SyncAction::DeleteLocal)
423 } else {
424 SyncJob::new(name, delimiter, SyncAction::CreateRemote)
427 }
428 }
429 }
430 } else if only_remote.contains(&name) {
431 match direction {
432 SyncDirection::Pull => {
433 SyncJob::new(name, delimiter, SyncAction::CreateLocal)
435 }
436 SyncDirection::Push => {
437 SyncJob::new(name, delimiter, SyncAction::DeleteRemote)
439 }
440 SyncDirection::TwoWay => {
441 let dirname = state::mailbox_to_dirname(&name, &delimiter);
442 if root_state
443 .contains_subdir(&dirname)
444 .expect("failed to inspect current state")
445 {
446 SyncJob::new(name, delimiter, SyncAction::DeleteRemote)
449 } else {
450 SyncJob::new(name, delimiter, SyncAction::CreateLocal)
453 }
454 }
455 }
456 } else {
457 let state = local_mailboxes_with_state.get(&name).unwrap();
459 let modseq = remote_mailboxes_with_state.get(&name).unwrap();
460 if !state.has_local_changes() && state.last_seen_highest_mod_seq() == *modseq {
461 trace!("Skipping {} because HIGHESTMODSEQ is in sync", name);
462 return None;
463 }
464 SyncJob::new(name, delimiter, state::SyncAction::Sync)
465 }
466 })
467 .collect();
468
469 if opts.list_mailbox_actions {
470 info!("The following actions would be performed:");
471 for job in sync_jobs {
472 info!(" {}: {:?}", job.name, job.action);
473 }
474 return Ok(());
475 }
476
477 let bail = sync_jobs.iter().any(|job| match job.action {
478 state::SyncAction::DeleteLocal => {
479 warn!("About to delete local mailbox {}", job.name);
480 true
481 }
482 state::SyncAction::DeleteRemote => {
483 warn!("About to delete remote mailbox {}", job.name);
484 true
485 }
486 _ => false,
487 });
488 if bail && !opts.force {
489 return Err(Error::DangerousAction());
490 }
491
492 let thread_count = if sync_jobs.len() < opts.threads.into() {
493 sync_jobs.len().try_into().unwrap()
495 } else {
496 opts.threads
497 };
498
499 let (mut tx_work, rx_work) = spmc::channel::<state::SyncJob>();
500 let (tx_result, rx_result) = mpsc::channel::<state::SyncJob>();
501
502 info!(
503 "Using {} threads to sync {} mailboxes",
504 thread_count,
505 sync_jobs.len()
506 );
507
508 let mut sess = iter::once(imap_session);
509
510 let threads: Vec<_> = (0..thread_count)
511 .map(|i| {
512 let rx = rx_work.clone();
513 let tx = tx_result.clone();
514 let opts = (*opts).clone();
515 let dir = direction.clone();
516 let session = sess.next();
517 let abort = Arc::clone(&abort);
518
519 thread::spawn(move || {
520 if let Err(e) = worker_thread(i, &opts, dir, rx, tx, session, abort) {
521 error!("Error in worker thread {}: {}", i, e);
522 };
523 })
524 })
525 .collect();
526
527 let expected_results = sync_jobs.len();
528 let mut received_results = 0usize;
529
530 for job in sync_jobs {
531 tx_work.send(job)?;
532 }
533
534 drop(tx_work);
535 drop(tx_result);
536
537 let mut success = true;
538
539 while let Ok(job) = rx_result.recv() {
540 trace!("root state update: {:?} {}", job.action, job.name);
542 success &= job.success;
543 received_results += 1;
544
545 root_state.sync_done(job)?;
546 }
547
548 for t in threads {
549 t.join().unwrap();
550 }
551
552 if success && (expected_results == received_results) {
553 info!("Sync successful");
554 Ok(())
555 } else {
556 error!("Sync failed!");
557 Err(Error::SyncIncomplete())
558 }
559}
560
561pub fn pull(opts: &SyncOptions, abort: Arc<AtomicBool>) -> Result<(), Error> {
583 measure!(
584 format!("Pulled from {}", opts.remote),
585 sync_(opts, SyncDirection::Pull, abort)
586 )
587}
588
589pub fn push(opts: &SyncOptions, abort: Arc<AtomicBool>) -> Result<(), Error> {
611 measure!(
612 format!("Pushed to {}", opts.remote),
613 sync_(opts, SyncDirection::Push, abort)
614 )
615}
616
617pub fn sync(opts: &SyncOptions, abort: Arc<AtomicBool>) -> Result<(), Error> {
640 measure!(
641 format!("Synced with {}", opts.remote),
642 sync_(opts, SyncDirection::TwoWay, abort)
643 )
644}