use crate::{
config::{IMAPConfig, PostarConfig},
inbox::{Folder, Inbox, Message, UIDRange},
migrations::MIGRATIONS,
};
use anyhow::Context;
use chumsky::prelude::any;
use imap::{
Session,
extensions::idle::SetReadTimeout,
types::{Fetch, ZeroCopy},
};
use log::{debug, info, warn};
use native_tls::TlsStream;
use rusqlite::{Connection, OptionalExtension, params};
use std::{
fs,
io::{Read, Write},
net::TcpStream,
path::Path,
thread,
time::{self, Duration},
};
#[derive(Debug, PartialEq, Eq)]
pub(super) enum InboxState {
Authenticated,
Selected,
}
#[derive(Debug)]
pub struct IMAPInbox<T: Read + Write> {
imap_session: Session<T>,
capabilities: InboxCapabilities,
pub(super) state: InboxState,
currently_selected_folder: Option<Folder>,
conn: Connection,
server_user_id: u16,
config: PostarConfig,
}
#[derive(Debug)]
struct InboxCapabilities {
has_move: bool,
has_idle: bool,
}
impl IMAPInbox<TlsStream<TcpStream>> {
pub fn from_config<T: AsRef<Path>>(
config: &PostarConfig,
server: &IMAPConfig,
db_path: T,
) -> anyhow::Result<Self> {
IMAPInbox::new_tls(
&server.server,
server.port,
&server.username,
&server.password,
server.self_signed_cert,
config,
db_path,
)
}
pub fn new_tls<T: AsRef<Path>>(
server: &str,
port: u16,
user: &str,
pass: &str,
use_self_signed_cert: bool,
config: &PostarConfig,
db_path: T,
) -> anyhow::Result<Self> {
let tls = native_tls::TlsConnector::builder()
.danger_accept_invalid_certs(use_self_signed_cert)
.build()?;
let client = {
let mut connection_attempts: u64 = 0;
let mut res = imap::connect((server, port), server, &tls)
.with_context(|| "Failed to connect to IMAP server");
while let Err(_) = res
&& connection_attempts < 3
{
connection_attempts += 1;
let sleep_time = (connection_attempts * 2).pow(2);
warn!(
"Connection attempt failed. Sleeping for {} seconds before retrying.",
sleep_time
);
thread::sleep(Duration::from_secs(sleep_time));
res = imap::connect((server, port), server, &tls)
.with_context(|| "Failed to connect to IMAP server");
}
res
}?;
let mut imap_session = client
.login(user, pass)
.map_err(|e| e.0)
.with_context(|| "Failed to login to IMAP")?;
let capabilities = imap_session
.capabilities()
.with_context(|| "Failed to fetch capabilities.")?;
if !capabilities.has_str("IMAP4rev1") {
return Err(anyhow::format_err!(
"The server doesn't advertise the IMAP4Rev1 capability that is needed for UID commands."
));
}
let mut conn = {
if let Some(db_parent) = db_path.as_ref().parent()
&& !db_parent.exists()
{
fs::create_dir_all(db_parent)?;
}
Connection::open(db_path).with_context(|| "Failed to open DB.")?
};
MIGRATIONS
.to_latest(&mut conn)
.with_context(|| "Failed to apply migrations to DB.")?;
{
let mut stmt =
conn.prepare("SELECT * FROM imap_servers WHERE server=?1 AND user=?2")?;
let mut server_res = stmt.query(params![server, user])?;
if let Ok(None) = server_res.next() {
conn.execute(
"INSERT INTO imap_servers (server, user) VALUES (?1, ?2)",
params![server, user],
)?;
}
}
let server_user_id = conn.query_one(
"SELECT id FROM imap_servers WHERE server=?1 AND user=?2",
params![server, user],
|row| row.get(0),
)?;
Ok(IMAPInbox {
imap_session,
capabilities: InboxCapabilities {
has_move: capabilities.has_str("MOVE"),
has_idle: capabilities.has_str("IDLE"),
},
state: InboxState::Authenticated,
currently_selected_folder: None,
conn,
server_user_id,
config: config.clone(),
})
}
}
impl<T: Read + Write + SetReadTimeout> IMAPInbox<T> {
fn ensure_selected(&mut self, folder: &Folder) -> anyhow::Result<()> {
match self.state {
InboxState::Selected => {
if self.currently_selected_folder.as_ref().unwrap() != folder {
self.select(folder)?;
}
}
InboxState::Authenticated => {
self.select(folder)?;
}
}
Ok(())
}
fn select(&mut self, folder: &Folder) -> anyhow::Result<()> {
let mailbox = self
.imap_session
.select(&folder.name)
.with_context(|| format!("Failed to select folder {}", folder.name))?;
let uid_validity: Option<u32> = self
.conn
.query_one(
"SELECT uid_validity FROM imap_folders WHERE server_id = ?1 AND name = ?2",
params![self.server_user_id, folder.name],
|row| row.get(0),
)
.optional()?;
let mailbox_validity = mailbox.uid_validity.ok_or(anyhow::format_err!(
"SELECT statement didn't return a UID VALIDITY"
))?;
let highest_uid = mailbox.uid_next.map(Ok).unwrap_or_else(|| {
let query = "*:*";
let fetch_results = self.imap_session.uid_fetch(query, "UID")?;
fetch_results
.iter()
.filter_map(|msg| msg.uid)
.max()
.ok_or_else(|| {
anyhow::format_err!("Cannot get highest UID: folder empty or UIDs missing.")
})
})?;
match uid_validity {
Some(uid_validity) => {
if uid_validity != mailbox_validity {
info!(
"Invalidating last_seen_uid for server {} folder {}",
self.server_user_id, folder.name
);
self.conn.execute("UPDATE imap_folders SET uid_validity=?1, last_seen_uid=?2 WHERE server_id = ?3 AND name = ?4", params![mailbox.uid_validity, self.server_user_id, folder.name, highest_uid])?;
}
}
None => {
self.conn.execute(
"INSERT INTO imap_folders (server_id, name, uid_validity, last_seen_uid) VALUES (?1, ?2, ?3, ?4)",
params![self.server_user_id, folder.name, mailbox_validity, highest_uid],
)?;
}
}
self.state = InboxState::Selected;
self.currently_selected_folder = Some(folder.clone());
Ok(())
}
fn get_last_seen_uid(&mut self, folder: &Folder) -> anyhow::Result<Option<u32>> {
let res: Option<u32> = self.conn.query_one(
"SELECT last_seen_uid FROM imap_folders WHERE server_id = ?1 AND name = ?2",
params![self.server_user_id, folder.name],
|row| row.get(0),
)?;
Ok(res)
}
fn close(&mut self) -> anyhow::Result<()> {
self.imap_session.close()?;
self.state = InboxState::Authenticated;
self.currently_selected_folder = None;
Ok(())
}
fn fetch_response_to_messages(
response: ZeroCopy<Vec<Fetch>>,
containing_folder: &Folder,
) -> Vec<Message> {
response
.into_iter()
.filter_map(|x| {
{
let body = x.body().map(|x| x.to_owned()).unwrap_or(Vec::new());
let uid = x.uid?;
Message::new(containing_folder.clone(), uid, body)
}
.ok()
})
.collect()
}
fn fetch_messages_from_last_seen_uid(
&mut self,
folder: &Folder,
) -> anyhow::Result<Vec<Message>> {
let last_uid = self.get_last_seen_uid(folder)?.unwrap_or(0);
self.ensure_selected(folder)?;
let response = self
.imap_session
.uid_fetch(format!("{}:*", last_uid + 1), "(FLAGS RFC822 UID)")
.with_context(|| format!("Failed to fetch messages in folder {}", folder.name))?;
let result = IMAPInbox::<T>::fetch_response_to_messages(response, folder);
let highest_uid = result.iter().map(|msg| msg.uid().unwrap_or(last_uid)).max();
if let Some(uid) = highest_uid {
self.conn.execute(
"UPDATE imap_folders SET last_seen_uid=?1 WHERE server_id=?2 AND name=?3",
params![uid, self.server_user_id, folder.name],
)?;
}
Ok(result)
}
}
impl<T: Read + Write + SetReadTimeout> Inbox for IMAPInbox<T> {
fn list_folders(&mut self) -> anyhow::Result<Vec<Folder>> {
let results = self.imap_session.list(None, Some("*"));
Ok(results?
.iter()
.map(|x| Folder {
name: x.name().to_owned(),
})
.collect())
}
fn fetch_all_messages_in_folder(&mut self, folder: &Folder) -> anyhow::Result<Vec<Message>> {
self.ensure_selected(folder)?;
let messages = self
.imap_session
.fetch("1:*", "(FLAGS RFC822 UID)")
.with_context(|| format!("Failed to fetch all messages in folder {}", folder.name))?;
Ok(IMAPInbox::<T>::fetch_response_to_messages(messages, folder))
}
fn move_message_to_folder(
&mut self,
message: &mut Message,
destination_folder: &Folder,
) -> anyhow::Result<()> {
let containing_folder = message
.containing_folder()
.ok_or(anyhow::format_err!("Message is invalid"))?;
let uid_set = message
.uid_set()
.ok_or(anyhow::format_err!("Message is invalid"))?;
self.ensure_selected(containing_folder)?;
if !self
.imap_session
.list(None, Some(&destination_folder.name))?
.iter()
.any(|f| f.name() == destination_folder.name)
{
info!("Existing folders:");
self.imap_session
.list(None, Some("*"))?
.iter()
.for_each(|f| info!(" - {}", f.name()));
return Err(anyhow::format_err!(
"Destination folder '{}' doesn't exist.",
destination_folder.name
));
}
if self.capabilities.has_move {
self.imap_session
.uid_mv(&uid_set, &destination_folder.name)?;
} else {
self.imap_session
.uid_store(&uid_set, "+FLAGS.SILENT \\Deleted")?;
self.imap_session
.uid_copy(&uid_set, &destination_folder.name)?;
self.imap_session.uid_expunge(&uid_set)?;
}
message.set_invalid();
Ok(())
}
fn delete_message(&mut self, message: &mut Message) -> anyhow::Result<()> {
let containing_folder = message
.containing_folder()
.ok_or(anyhow::format_err!("Message is invalid"))?;
let uid_set = message
.uid_set()
.ok_or(anyhow::format_err!("Message is invalid"))?;
self.ensure_selected(containing_folder)?;
self.imap_session
.uid_store(&uid_set, "+FLAGS (\\Deleted)")?;
self.imap_session.uid_expunge(&uid_set)?;
message.set_invalid();
Ok(())
}
fn poll_new_messages(&mut self, folder: &Folder) -> anyhow::Result<Vec<Message>> {
self.ensure_selected(folder)?;
if self.capabilities.has_idle {
loop {
let idle = self.imap_session.idle()?;
idle.wait_keepalive()?;
let last_uid = self.get_last_seen_uid(folder)?.unwrap_or(0);
let has_messages = {
let response = self
.imap_session
.uid_search(format!("UID {}:*", last_uid + 1))
.with_context(|| format!("Failed to SEARCH in folder {}", folder.name))?;
!response.is_empty()
};
if has_messages {
break;
}
}
} else {
loop {
let _ = self.imap_session.noop();
thread::sleep(time::Duration::from_secs(self.config.polling_delay.into()));
let last_uid = self.get_last_seen_uid(folder)?.unwrap_or(0);
let has_messages = {
let response = self
.imap_session
.uid_search(format!("UID {}:*", last_uid + 1))
.with_context(|| format!("Failed to SEARCH in folder {}", folder.name))?;
!response.is_empty()
};
if has_messages {
break;
}
}
}
self.fetch_messages_from_last_seen_uid(folder)
}
fn fetch_messages_in_folder(
&mut self,
folder: &Folder,
uid_start: UIDRange,
uid_end: UIDRange,
) -> anyhow::Result<Vec<Message>> {
self.ensure_selected(folder)?;
let uid_range = {
let start = match uid_start {
UIDRange::UID(uid) => uid.to_string(),
UIDRange::Any => String::from("*"),
};
let end = match uid_end {
UIDRange::UID(uid) => uid.to_string(),
UIDRange::Any => String::from("*"),
};
format!("{}:{}", start, end)
};
let messages = self
.imap_session
.fetch(uid_range, "(FLAGS RFC822 UID)")
.with_context(|| format!("Failed to fetch all messages in folder {}", folder.name))?;
Ok(IMAPInbox::<T>::fetch_response_to_messages(messages, folder))
}
fn fetch_top_n_messages_in_folder(
&mut self,
folder: &Folder,
n: u32,
) -> anyhow::Result<Vec<Message>> {
self.ensure_selected(folder)?;
if n == 0 {
return Ok(Vec::new());
}
let all_uids = self
.imap_session
.uid_search("ALL")
.with_context(|| format!("Failed to search messages in folder {}", folder.name))?;
if all_uids.is_empty() {
return Ok(Vec::new());
}
let mut sorted_uids: Vec<u32> = all_uids.into_iter().collect();
sorted_uids.sort();
let n = n as usize;
let start_idx = if sorted_uids.len() > n {
sorted_uids.len() - n
} else {
0
};
let top_uids: Vec<u32> = sorted_uids[start_idx..].to_vec();
let uid_set = top_uids
.iter()
.map(|uid| uid.to_string())
.collect::<Vec<_>>()
.join(",");
let messages = self
.imap_session
.uid_fetch(&uid_set, "(FLAGS RFC822 UID)")
.with_context(|| {
format!(
"Failed to fetch top {} messages in folder {}",
n, folder.name
)
})?;
Ok(IMAPInbox::<T>::fetch_response_to_messages(messages, folder))
}
}
impl<T: Read + Write> Drop for IMAPInbox<T> {
fn drop(&mut self) {
let _ = self.imap_session.logout();
}
}