use anyhow::{Context, Result, anyhow};
use chrono::{DateTime, Utc};
use colored::Colorize;
use futures::StreamExt;
use futures::stream;
use std::collections::HashSet;
use std::time::Duration;
use tokio::time::sleep;
use pidge_client::{AuthClient, ClientError, GraphClient};
use pidge_core::Config;
use crate::commands::mail_delete::parse_older_than;
use crate::commands::mail_fragment::{purge_from_cache, resolve};
pub async fn mark_read(fragment: String) -> Result<()> {
let (short_hash, msg) = resolve(&fragment)?;
with_graph(
|graph| async move { graph.mark_read(&msg.account, &msg.graph_id).await },
&short_hash,
)
.await?;
println!("{} Marked {} as read.", "✔".green(), short_hash.dimmed());
Ok(())
}
pub async fn mark_unread(fragment: String) -> Result<()> {
let (short_hash, msg) = resolve(&fragment)?;
with_graph(
|graph| async move { graph.mark_unread(&msg.account, &msg.graph_id).await },
&short_hash,
)
.await?;
println!("{} Marked {} as unread.", "✔".green(), short_hash.dimmed());
Ok(())
}
pub async fn flag(fragment: String) -> Result<()> {
let (short_hash, msg) = resolve(&fragment)?;
with_graph(
|graph| async move { graph.set_flag(&msg.account, &msg.graph_id, true).await },
&short_hash,
)
.await?;
println!("{} Flagged {}.", "✔".green(), short_hash.dimmed());
Ok(())
}
pub async fn unflag(fragment: String) -> Result<()> {
let (short_hash, msg) = resolve(&fragment)?;
with_graph(
|graph| async move { graph.set_flag(&msg.account, &msg.graph_id, false).await },
&short_hash,
)
.await?;
println!("{} Unflagged {}.", "✔".green(), short_hash.dimmed());
Ok(())
}
pub async fn archive(fragment: String) -> Result<()> {
let (short_hash, msg) = resolve(&fragment)?;
with_graph(
|graph| async move {
graph
.move_message(&msg.account, &msg.graph_id, "archive")
.await
},
&short_hash,
)
.await?;
let _ = purge_from_cache(&short_hash);
println!("{} Archived {}.", "✔".green(), short_hash.dimmed());
Ok(())
}
pub async fn archive_dispatch(
fragment: Option<String>,
from: Vec<String>,
older_than: Option<String>,
accounts: Vec<String>,
yes: bool,
) -> Result<()> {
match (fragment, from.is_empty(), older_than.as_ref()) {
(Some(f), true, None) => archive(f).await,
(None, false, _) | (None, _, Some(_)) => {
archive_bulk(from, older_than, accounts, yes).await
}
(None, true, None) => Err(anyhow!(
"Specify a fragment, `--from <sender>`, or `--older-than <spec>`. \
Run `pidge mail archive --help`."
)),
(Some(_), _, _) => unreachable!("clap enforces conflicts_with"),
}
}
async fn archive_bulk(
from: Vec<String>,
older_than: Option<String>,
account_filter: Vec<String>,
yes: bool,
) -> Result<()> {
if !yes {
return Err(anyhow!(
"Bulk archive requires explicit `-y` confirmation — there is no \
interactive prompt. Re-run with `-y` if you really mean it."
));
}
let cutoff: Option<DateTime<Utc>> = older_than.as_deref().map(parse_older_than).transpose()?;
let from_set: HashSet<String> = from.iter().map(|s| s.to_ascii_lowercase()).collect();
let config = Config::load()?;
if config.accounts.is_empty() {
return Err(anyhow!(
"No accounts signed in. Run `pidge account add` to add one."
));
}
let target_emails: Vec<String> = if account_filter.is_empty() {
config.accounts.iter().map(|a| a.email.clone()).collect()
} else {
for f in &account_filter {
if config.find(f).is_none() {
return Err(anyhow!("not signed in to {f}"));
}
}
account_filter
};
let filter_desc = describe_filter(&from_set, cutoff.as_ref(), older_than.as_deref());
let scope = if from_set.is_empty() {
"Inbox"
} else {
"mailbox"
};
println!(
"{} Archiving {scope} messages where {}…",
"Bulk".yellow().bold(),
filter_desc
);
let graph = GraphClient::new(AuthClient::from_env()?)?;
const PAGE_SIZE: usize = 50;
const MAX_PAGES: usize = 400;
let mut total = 0usize;
for email in &target_emails {
let count = if from_set.is_empty() {
archive_bulk_inbox_for_account(&graph, email, cutoff, PAGE_SIZE, MAX_PAGES).await?
} else {
archive_bulk_by_sender_for_account(&graph, email, &from_set, cutoff).await?
};
total += count;
println!(
"{} {}: archived {count} message{}",
"✔".green(),
email,
if count == 1 { "" } else { "s" }
);
}
println!("{} Total: {total}.", "✔".green().bold());
Ok(())
}
fn describe_filter(
from: &HashSet<String>,
cutoff: Option<&DateTime<Utc>>,
older_than_spec: Option<&str>,
) -> String {
let mut parts: Vec<String> = Vec::new();
if !from.is_empty() {
let mut senders: Vec<&str> = from.iter().map(String::as_str).collect();
senders.sort_unstable();
parts.push(format!("from is one of [{}]", senders.join(", ")));
}
if let (Some(c), Some(spec)) = (cutoff, older_than_spec) {
parts.push(format!(
"received before {} (cutoff {})",
spec,
c.format("%Y-%m-%d %H:%M UTC")
));
}
parts.join(" AND ")
}
async fn archive_bulk_inbox_for_account(
graph: &GraphClient,
account: &str,
cutoff: Option<DateTime<Utc>>,
page_size: usize,
max_pages: usize,
) -> Result<usize> {
let mut archived = 0usize;
let mut skip = 0usize;
for _ in 0..max_pages {
let result = graph
.list_inbox(account, page_size, skip, false)
.await
.context("listing inbox for bulk archive")?;
if result.messages.is_empty() {
break;
}
let to_archive: Vec<&pidge_core::Message> = result
.messages
.iter()
.filter(|m| cutoff.is_none_or(|c| m.received_at < c))
.collect();
let matched_now = to_archive.len();
archived += move_to_archive(graph, account, &to_archive).await;
let page_len = result.messages.len();
skip += page_len - matched_now;
if let Some(c) = cutoff {
let oldest = result
.messages
.iter()
.map(|m| m.received_at)
.min()
.unwrap_or(c);
if oldest >= c {
break;
}
}
if !result.has_more && matched_now == 0 {
break;
}
}
Ok(archived)
}
async fn archive_bulk_by_sender_for_account(
graph: &GraphClient,
account: &str,
from_set: &HashSet<String>,
cutoff: Option<DateTime<Utc>>,
) -> Result<usize> {
const SEARCH_LIMIT: usize = 1000;
let mut archived = 0usize;
for sender in from_set {
let messages = match graph
.search_messages(account, &format!("from:{sender}"), SEARCH_LIMIT)
.await
{
Ok(m) => m,
Err(e) => {
eprintln!(" {} search failed for {}: {e}", "!".red(), sender.dimmed());
continue;
}
};
let matches: Vec<&pidge_core::Message> = messages
.iter()
.filter(|m| {
m.from.address.to_ascii_lowercase() == *sender
&& cutoff.is_none_or(|c| m.received_at < c)
})
.collect();
archived += move_to_archive(graph, account, &matches).await;
}
Ok(archived)
}
async fn move_to_archive(
graph: &GraphClient,
account: &str,
messages: &[&pidge_core::Message],
) -> usize {
const MAX_INFLIGHT: usize = 4;
let mut ok = 0usize;
let tasks = messages.iter().map(|m| {
let id = m.id.clone();
let short = pidge_core::short_hash(&m.id);
async move {
let res = move_with_retry(graph, account, &id).await;
(short, res)
}
});
let mut stream = stream::iter(tasks).buffer_unordered(MAX_INFLIGHT);
while let Some((short, res)) = stream.next().await {
match res {
Ok(()) => {
ok += 1;
let _ = purge_from_cache(&short);
}
Err(ClientError::Graph { status: 404, .. }) => { }
Err(e) => {
eprintln!(" {} failed to archive {}: {e}", "!".red(), short.dimmed());
}
}
}
ok
}
async fn move_with_retry(
graph: &GraphClient,
account: &str,
message_id: &str,
) -> Result<(), ClientError> {
const MAX_RETRIES: u32 = 5;
let mut delay_ms = 500u64;
for attempt in 0..=MAX_RETRIES {
match graph.move_message(account, message_id, "archive").await {
Ok(()) => return Ok(()),
Err(ClientError::Graph { status: 429, .. }) if attempt < MAX_RETRIES => {
sleep(Duration::from_millis(delay_ms)).await;
delay_ms = (delay_ms * 2).min(8_000);
}
Err(e) => return Err(e),
}
}
unreachable!("loop returns on Ok or after MAX_RETRIES exits")
}
async fn with_graph<F, Fut>(op: F, short_hash: &str) -> Result<()>
where
F: FnOnce(GraphClient) -> Fut,
Fut: std::future::Future<Output = Result<(), ClientError>>,
{
let graph = GraphClient::new(AuthClient::from_env()?)?;
match op(graph).await {
Ok(()) => Ok(()),
Err(ClientError::Graph { status: 404, .. }) => {
let _ = purge_from_cache(short_hash);
Err(anyhow!(
"Message not found on server (it may have been deleted or moved). \
Run `pidge mail` to refresh the cache."
))
}
Err(e) => Err(e.into()),
}
}