1use imap::extensions::list_status::ExtendedNames;
21use imap::ClientBuilder;
22use imap::Session;
23use log::{debug, error, info, trace};
24use maildir::Maildir;
25use std::collections::BTreeSet;
26use std::fs;
27use std::io;
28use std::iter::Iterator;
29use std::path::PathBuf;
30use std::sync::mpsc;
31use std::thread;
32use std::time::Instant;
33use thiserror::Error;
34use vomit::{Mailbox, VomitError};
35use wildmatch::WildMatch;
36
37mod flags;
38mod mailboxes;
39mod seqset;
40mod state;
41mod sync;
42
43#[derive(Error, Debug)]
44pub enum SyncError {
45 #[error("IMAP error: {0}")]
46 ProtocolError(#[from] imap::Error),
47 #[error("invalid configuration: {0}")]
48 ConfigError(&'static str),
49 #[error("Refused to perform dangerous action - set 'force' option to override")]
50 DangerousActionError(),
51 #[error("failed to load state: {0}")]
52 StateError(#[from] state::StateError),
53 #[error("IPC error: {0}")]
54 IPCError(#[from] spmc::SendError<mailboxes::SyncJob>),
55 #[error("error accessing maildir: {0}")]
56 IOError(#[from] io::Error),
57 #[error("error managing maildir: {0}")]
58 MaildirError(#[from] maildir::MaildirError),
59 #[error("{0}")]
60 MailboxesError(#[from] mailboxes::MailboxesError),
61 #[error("error in maildir abstraction library: {0}")]
62 VomitError(#[from] VomitError),
63 #[error("{0}")]
64 Error(&'static str),
65 #[error("{0}")]
66 E(String),
67}
68
69#[derive(Clone, Debug, PartialEq, Eq)]
70pub enum SyncDirection {
71 Pull,
72 Push,
73 TwoWay,
74}
75
76#[derive(Clone, Debug)]
78pub struct SyncOptions {
79 pub local: String,
81 pub remote: String,
83 pub user: String,
85 pub password: String,
87 pub threads: u8,
92 pub unsafe_tls: bool,
94 pub disable_tls: bool,
96 pub list_mailbox_actions: bool,
99 pub include: Vec<String>,
103 pub exclude: Vec<String>,
108 pub force: bool,
110}
111
112macro_rules! measure {
113 ( $m:expr, $x:expr ) => {{
114 let start = Instant::now();
115 let result = $x;
116 let duration = start.elapsed();
117 debug!("{} in {}ms", $m, duration.as_millis());
118 result
119 }};
120}
121
122const DATA_ITEMS_HMS: &str = "(HIGHESTMODSEQ)";
123
124fn get_hierarchy_delimiter(names: &ExtendedNames) -> Result<String, SyncError> {
125 let delims: BTreeSet<&str> = names
126 .iter()
127 .filter_map(|(name, _)| name.delimiter())
128 .collect();
129 if delims.len() != 1 {
130 return Err(SyncError::E(format!(
131 "Expected exactly on hierarchy delimiter, found {:?}",
132 delims
133 )));
134 }
135 Ok(String::from(delims.into_iter().next().unwrap()))
136}
137
138fn new_session(
139 opts: &SyncOptions,
140) -> Result<Session<impl std::io::Read + std::io::Write>, SyncError> {
141 let (host, port) = match opts.remote.rsplit_once(':') {
142 Some((host, port)) => (host, port),
143 None => (opts.remote.as_str(), "993"),
144 };
145
146 let port = match port.parse() {
147 Ok(p) => p,
148 Err(e) => {
149 error!("failed to parse remote port: {}", e);
150 return Err(SyncError::ConfigError("invalid port"));
151 }
152 };
153
154 debug!("Connecting to {}:{}", host, port);
155
156 let client = if opts.disable_tls {
157 ClientBuilder::new(host, port)
158 .mode(imap::ConnectionMode::Plaintext)
159 .connect()?
160 } else if opts.unsafe_tls {
161 ClientBuilder::new(host, port)
162 .danger_skip_tls_verify(true)
163 .connect()?
164 } else {
165 ClientBuilder::new(host, port).connect()?
166 };
167
168 debug!("Logging in as {}", &opts.user);
169 let mut session = client.login(&opts.user, &opts.password).map_err(|e| e.0)?;
170
171 session.run_command_and_read_response("ENABLE QRESYNC")?;
172
173 Ok(session)
174}
175
176fn check_server_capabilities_(opts: &SyncOptions) -> Result<(), SyncError> {
177 let mut session = new_session(opts)?;
178 let caps = session.capabilities()?;
179 for cap in caps.iter() {
180 trace!("Server capability: {:?}", cap);
181 }
182 session.logout()?;
183 if !caps.has_str("QRESYNC") {
184 return Err(SyncError::E(
185 "Server does not support QRESYNC (RFC 7162)".to_string(),
186 ));
187 }
188 if !caps.has_str("UIDPLUS") {
189 return Err(SyncError::E(
190 "Server does not support UIDPLUS (RFC 4315)".to_string(),
191 ));
192 }
193 if !caps.has_str("LIST-STATUS") {
194 return Err(SyncError::E(
195 "Server does not support LIST-STATUS (RFC 5819)".to_string(),
196 ));
197 }
198 Ok(())
199}
200
201pub fn list_mailboxes(opts: &SyncOptions) -> Result<(), SyncError> {
206 let mut session = new_session(opts)?;
207 let names = session.list_status(None, Some("*"), DATA_ITEMS_HMS)?;
208 if names.is_empty() {
209 return Err(SyncError::Error("No remote mailboxes found"));
210 }
211
212 let delimiter = get_hierarchy_delimiter(&names)?;
213 let remote_mailboxes = mailboxes::load_remote(&names);
214 let local_mailboxes = mailboxes::load_local(&opts.local, &delimiter)?;
215
216 info!("Hierarchy delimiter: {}", delimiter);
217 info!("Remote mailboxes:");
218 for m in remote_mailboxes.keys() {
219 info!(" {}", m);
220 }
221 info!("Local mailboxes:");
222 for m in local_mailboxes.keys() {
223 info!(" {}", m);
224 }
225 Ok(())
226}
227
228pub fn check_server_capabilities(opts: &SyncOptions) -> Result<(), SyncError> {
234 measure!(
235 format!("Checked capabilities for {}", opts.remote),
236 check_server_capabilities_(opts)
237 )
238}
239
240fn sync_mailbox<T: io::Read + io::Write>(
241 maildir_root: &str,
242 sync_job: &mailboxes::SyncJob,
243 direction: &SyncDirection,
244 session: &mut Session<T>,
245) -> Result<(), SyncError> {
246 let mailbox = &sync_job.name;
247 let dirname = Mailbox::virtual_to_dir(mailbox, &sync_job.delimiter);
248 let mbpath: PathBuf = [maildir_root, &dirname].iter().collect();
249
250 match sync_job.action {
251 mailboxes::SyncAction::Sync => (),
252 mailboxes::SyncAction::CreateLocal => {
253 trace!("Creating local mailbox {}", mailbox);
254 }
256 mailboxes::SyncAction::CreateRemote => {
257 trace!("Creating remote mailbox {}", mailbox);
258 session.create(mailbox)?
259 }
260 mailboxes::SyncAction::DeleteLocal => {
261 trace!("Deleting local mailbox {}", mailbox);
262 return fs::remove_dir_all(&mbpath).map_err(SyncError::IOError);
263 }
264 mailboxes::SyncAction::DeleteRemote => {
265 trace!("Deleting remote mailbox {}", mailbox);
266 return session.delete(mailbox).map_err(SyncError::ProtocolError);
267 }
268 };
269
270 let maildir = Maildir::from(mbpath);
271 maildir.create_dirs()?;
272
273 let state = measure!(
274 format!("Loaded state for {}", mailbox),
275 state::SyncState::load(&maildir.path())?
276 );
277
278 let remote = session.select(mailbox)?;
279
280 match direction {
281 SyncDirection::Pull => sync::pull(session, sync_job, maildir, remote, state),
282 SyncDirection::Push => sync::push(session, sync_job, maildir, remote, state),
283 SyncDirection::TwoWay => sync::sync(session, sync_job, maildir, remote, state),
284 }
285 .map_err(|e| SyncError::E(format!("{}: {}", mailbox, e)))
286}
287
288fn worker_thread(
289 i: u8,
290 opts: &SyncOptions,
291 direction: SyncDirection,
292 rx: spmc::Receiver<mailboxes::SyncJob>,
293 tx: mpsc::Sender<mailboxes::SyncJob>,
294) -> Result<(), SyncError> {
295 let mut imap_session = new_session(opts)?;
296 while let Ok(job) = rx.recv() {
297 trace!("Syncing {} in worker thread {}", job.name, i);
298 measure!(
299 format!("Synced {}", job.name),
300 sync_mailbox(&opts.local, &job, &direction, &mut imap_session)?
301 );
302 tx.send(job)?;
303 }
304 drop(tx);
305 imap_session.logout()?;
306 Ok(())
307}
308
309fn root_state_thread(
310 root_dir: String,
311 rx_results: mpsc::Receiver<mailboxes::SyncJob>,
312) -> Result<(), SyncError> {
313 trace!("root state thread managing state in {}", root_dir);
314 let mut root_state = state::RootState::load(&root_dir)?;
315 while let Ok(job) = rx_results.recv() {
316 trace!("root state update: {:?} {}", job.action, job.name);
318 mailboxes::sync_done(job, &mut root_state);
319 root_state.save()?;
320 }
321 Ok(())
322}
323
324fn sync_(opts: &SyncOptions, direction: SyncDirection) -> Result<(), SyncError> {
325 info!("Syncing from {} to {}", opts.remote, opts.local);
326
327 fs::create_dir_all(&opts.local)?;
328
329 let root_state = state::RootState::load(&opts.local)?;
330
331 let mut imap_session = new_session(opts)?;
332
333 let names = imap_session.list_status(None, Some("*"), DATA_ITEMS_HMS)?;
334
335 if names.is_empty() {
336 return Err(SyncError::Error("No remote mailboxes found, giving up"));
337 }
338
339 let includes: Vec<WildMatch> = opts.include.iter().map(|p| WildMatch::new(p)).collect();
340 let excludes: Vec<WildMatch> = opts.exclude.iter().map(|p| WildMatch::new(p)).collect();
341
342 let delimiter = get_hierarchy_delimiter(&names)?;
343 let remote_mailboxes_with_state = mailboxes::load_remote(&names);
344 let remote_mailboxes: BTreeSet<&String> = remote_mailboxes_with_state.keys().collect();
345 let local_mailboxes_with_state = measure!(
346 format!("Loaded local mailboxes"),
347 mailboxes::load_local(&opts.local, &delimiter)?
348 );
349 let local_mailboxes: BTreeSet<&String> = local_mailboxes_with_state.keys().collect();
350
351 trace!("local: {:?}", local_mailboxes);
352 trace!("remote: {:?}", remote_mailboxes);
353
354 let only_local: BTreeSet<&String> = local_mailboxes
355 .difference(&remote_mailboxes)
356 .cloned()
357 .collect();
358 trace!("Only local: {:?}", only_local);
359 let only_remote: BTreeSet<&String> = remote_mailboxes
360 .difference(&local_mailboxes)
361 .cloned()
362 .collect();
363 trace!("Only remote: {:?}", only_remote);
364 let sync_jobs: Vec<mailboxes::SyncJob> = local_mailboxes
365 .union(&remote_mailboxes)
366 .cloned()
367 .filter_map(|name| {
368 if !includes.is_empty() && !includes.iter().any(|e| e.matches(name)) {
370 trace!("Skipping {} due to include filter", name);
371 return None;
372 }
373 if excludes.iter().any(|e| e.matches(name)) {
374 trace!("Skipping {} due to exclude filter", name);
375 return None;
376 }
377 trace!("Processing mailbox {}", name);
378 let delimiter = delimiter.clone();
379
380 let name = name.clone();
384 if only_local.contains(&name) {
385 match direction {
386 SyncDirection::Pull => {
387 mailboxes::sync_delete_local(name, delimiter)
389 }
390 SyncDirection::Push => {
391 mailboxes::sync_create_remote(name, delimiter)
393 }
394 SyncDirection::TwoWay => {
395 let dirname = Mailbox::virtual_to_dir(&name, &delimiter);
396 if root_state.subdirs.contains(&dirname) {
397 mailboxes::sync_delete_local(name, delimiter)
400 } else {
401 mailboxes::sync_create_remote(name, delimiter)
404 }
405 }
406 }
407 } else if only_remote.contains(&name) {
408 match direction {
409 SyncDirection::Pull => {
410 mailboxes::sync_create_local(name, delimiter)
412 }
413 SyncDirection::Push => {
414 mailboxes::sync_delete_remote(name, delimiter)
416 }
417 SyncDirection::TwoWay => {
418 let dirname = Mailbox::virtual_to_dir(&name, &delimiter);
419 if root_state.subdirs.contains(&dirname) {
420 mailboxes::sync_delete_remote(name, delimiter)
423 } else {
424 mailboxes::sync_create_local(name, delimiter)
427 }
428 }
429 }
430 } else {
431 let state = local_mailboxes_with_state.get(&name).unwrap();
433 let modseq = remote_mailboxes_with_state.get(&name).unwrap();
434 if !state.has_local_changes() && state.last_seen.highest_mod_seq == *modseq {
435 trace!("Skipping {} because HIGHESTMODSEQ is in sync", name);
436 return None;
437 }
438 mailboxes::sync(name, delimiter)
439 }
440 })
441 .collect();
442
443 if opts.list_mailbox_actions {
444 info!("The following actions would be performed:");
445 for job in sync_jobs {
446 info!(" {}: {:?}", job.name, job.action);
447 }
448 return Ok(());
449 }
450
451 let bail = sync_jobs
452 .iter()
453 .map(|job| match job.action {
454 mailboxes::SyncAction::DeleteLocal => {
455 error!("Refusing to delete local mailbox {}", job.name);
456 true
457 }
458 mailboxes::SyncAction::DeleteRemote => {
459 error!("Refusing to delete remote mailbox {}", job.name);
460 true
461 }
462 _ => false,
463 })
464 .any(|b| b);
465 if bail {
466 return Err(SyncError::DangerousActionError());
467 }
468
469 drop(root_state);
470
471 let thread_count = if sync_jobs.len() < opts.threads.into() {
472 sync_jobs.len().try_into().unwrap()
474 } else {
475 opts.threads
476 };
477
478 let mut threads = Vec::new();
479 let (mut tx_work, rx_work) = spmc::channel::<mailboxes::SyncJob>();
480 let (tx_result, rx_result) = mpsc::channel::<mailboxes::SyncJob>();
481
482 let root_dir = opts.local.clone();
483 let t = thread::spawn(move || {
484 if let Err(e) = root_state_thread(root_dir, rx_result) {
485 error!("Error in root state thread: {}", e);
486 };
487 });
488 threads.push(t);
489
490 info!(
491 "Using {} threads to sync {} mailboxes",
492 thread_count,
493 sync_jobs.len()
494 );
495 for i in 1..thread_count {
496 let rx = rx_work.clone();
497 let tx = tx_result.clone();
498 let opts = (*opts).clone();
499 let dir = direction.clone();
500
501 let t = thread::spawn(move || {
502 if let Err(e) = worker_thread(i, &opts, dir, rx, tx) {
503 error!("Error in worker thread {}: {}", i, e);
504 };
505 });
506 threads.push(t);
507 }
508 for job in sync_jobs {
509 tx_work.send(job)?;
510 }
511 drop(tx_work);
512
513 while let Ok(job) = rx_work.recv() {
514 debug!("Syncing {} in main thread ({:?})", job.name, job.action);
515 measure!(
516 format!("Synced {}", job.name),
517 sync_mailbox(&opts.local, &job, &direction, &mut imap_session)?
518 );
519 tx_result.send(job)?;
520 }
521 drop(tx_result);
522
523 _ = imap_session.logout();
524
525 for t in threads {
526 t.join().unwrap();
527 }
528
529 info!("Sync successful");
530 Ok(())
531}
532
533pub fn pull(opts: &SyncOptions) -> Result<(), SyncError> {
537 measure!(
538 format!("Pulled from {}", opts.remote),
539 sync_(opts, SyncDirection::Pull)
540 )
541}
542
543pub fn push(opts: &SyncOptions) -> Result<(), SyncError> {
547 measure!(
548 format!("Pushed to {}", opts.remote),
549 sync_(opts, SyncDirection::Push)
550 )
551}
552
553pub fn sync(opts: &SyncOptions) -> Result<(), SyncError> {
557 measure!(
558 format!("Synced with {}", opts.remote),
559 sync_(opts, SyncDirection::TwoWay)
560 )
561}