use super::types::{
EmailChangesRequest, EmailChangesResponse, EmailQueryChangesRequest, EmailQueryChangesResponse,
};
use crate::methods::email_query_helpers::{
compute_query_diff, execute_email_query, execute_email_query_no_filter,
get_previous_query_results,
};
use crate::methods::ensure_account_ownership;
use crate::types::Principal;
use rusmes_storage::MessageStore;
pub async fn email_changes(
request: EmailChangesRequest,
message_store: &dyn MessageStore,
principal: &Principal,
) -> anyhow::Result<EmailChangesResponse> {
ensure_account_ownership(&request.account_id, principal)?;
let since_modseq: u64 = request
.since_state
.parse()
.map_err(|_| anyhow::anyhow!("Invalid state: {}", request.since_state))?;
let max_changes = request.max_changes.unwrap_or(100);
let current_modseq = super::get_current_modseq(message_store).await?;
let (created, updated, destroyed, has_more) =
query_email_changes(message_store, since_modseq, max_changes).await?;
let new_state = if has_more {
(since_modseq + max_changes).to_string()
} else {
current_modseq.to_string()
};
Ok(EmailChangesResponse {
account_id: request.account_id,
old_state: request.since_state,
new_state,
has_more_changes: has_more,
created,
updated,
destroyed,
})
}
pub async fn email_query_changes(
request: EmailQueryChangesRequest,
message_store: &dyn MessageStore,
principal: &Principal,
) -> anyhow::Result<EmailQueryChangesResponse> {
ensure_account_ownership(&request.account_id, principal)?;
let since_state: u64 = request
.since_query_state
.parse()
.map_err(|_| anyhow::anyhow!("Invalid query state: {}", request.since_query_state))?;
let mut current_results = if let Some(filter) = &request.filter {
execute_email_query(message_store, filter, request.sort.as_ref()).await?
} else {
execute_email_query_no_filter(message_store, request.sort.as_ref()).await?
};
let previous_results = get_previous_query_results(since_state).await?;
if let Some(ref up_to_id) = request.up_to_id {
if let Some(pos) = current_results.iter().position(|id| id == up_to_id) {
current_results.truncate(pos + 1);
}
}
let (removed, added) = compute_query_diff(&previous_results, ¤t_results);
let total_changes = removed.len() as u64 + added.len() as u64;
if let Some(explicit_max) = request.max_changes {
if total_changes > explicit_max {
return Err(anyhow::anyhow!(
"cannotCalculateChanges: {} changes exceed maxChanges={}",
total_changes,
explicit_max
));
}
}
let new_query_state = super::get_current_modseq(message_store).await?.to_string();
let total = if request.calculate_total.unwrap_or(false) {
Some(current_results.len() as u64)
} else {
None
};
Ok(EmailQueryChangesResponse {
account_id: request.account_id,
old_query_state: request.since_query_state,
new_query_state,
total,
removed,
added,
})
}
async fn query_email_changes(
_message_store: &dyn MessageStore,
_since_modseq: u64,
_max_changes: u64,
) -> anyhow::Result<(Vec<String>, Vec<String>, Vec<String>, bool)> {
Ok((Vec::new(), Vec::new(), Vec::new(), false))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::methods::email_advanced::test_helpers::{create_test_store, test_principal};
#[tokio::test]
async fn test_email_changes() {
let store = create_test_store();
let request = EmailChangesRequest {
account_id: "acc1".to_string(),
since_state: "1".to_string(),
max_changes: Some(50),
};
let response = email_changes(request, store.as_ref(), &test_principal())
.await
.expect("email_changes failed");
assert_eq!(response.account_id, "acc1");
assert_eq!(response.old_state, "1");
assert!(!response.has_more_changes);
}
#[tokio::test]
async fn test_email_query_changes() {
let store = create_test_store();
let request = EmailQueryChangesRequest {
account_id: "acc1".to_string(),
since_query_state: "1".to_string(),
filter: None,
sort: None,
max_changes: Some(50),
up_to_id: None,
calculate_total: Some(true),
};
let response = email_query_changes(request, store.as_ref(), &test_principal())
.await
.expect("email_query_changes failed");
assert_eq!(response.account_id, "acc1");
assert!(response.total.is_some());
}
#[tokio::test]
async fn test_email_changes_max_changes() {
let store = create_test_store();
let request = EmailChangesRequest {
account_id: "acc1".to_string(),
since_state: "5".to_string(),
max_changes: Some(10),
};
let response = email_changes(request, store.as_ref(), &test_principal())
.await
.expect("email_changes max_changes failed");
assert_eq!(response.old_state, "5");
assert!(response.new_state.parse::<u64>().expect("parse u64") >= 5);
}
#[tokio::test]
async fn test_email_query_changes_with_filter() {
let store = create_test_store();
let filter = crate::types::EmailFilterCondition {
in_mailbox: Some("inbox".to_string()),
in_mailbox_other_than: None,
before: None,
after: None,
min_size: None,
max_size: None,
all_in_thread_have_keyword: None,
some_in_thread_have_keyword: None,
none_in_thread_have_keyword: None,
has_keyword: None,
not_keyword: None,
has_attachment: None,
text: None,
from: None,
to: None,
cc: None,
bcc: None,
subject: None,
body: None,
header: None,
};
let request = EmailQueryChangesRequest {
account_id: "acc1".to_string(),
since_query_state: "10".to_string(),
filter: Some(filter),
sort: None,
max_changes: None,
up_to_id: None,
calculate_total: Some(false),
};
let response = email_query_changes(request, store.as_ref(), &test_principal())
.await
.expect("email_query_changes with_filter failed");
assert!(response.total.is_none());
}
#[tokio::test]
async fn test_email_changes_empty_state() {
let store = create_test_store();
let request = EmailChangesRequest {
account_id: "acc1".to_string(),
since_state: "0".to_string(),
max_changes: None,
};
let response = email_changes(request, store.as_ref(), &test_principal())
.await
.expect("email_changes empty_state failed");
assert!(response.new_state.parse::<u64>().is_ok());
assert!(response.created.is_empty());
assert!(response.updated.is_empty());
assert!(response.destroyed.is_empty());
}
#[tokio::test]
async fn test_email_query_changes_calculate_total() {
let store = create_test_store();
let request = EmailQueryChangesRequest {
account_id: "acc1".to_string(),
since_query_state: "100".to_string(),
filter: None,
sort: None,
max_changes: Some(25),
up_to_id: Some("msg50".to_string()),
calculate_total: Some(true),
};
let response = email_query_changes(request, store.as_ref(), &test_principal())
.await
.expect("email_query_changes calculate_total failed");
assert!(response.total.is_some());
assert_eq!(response.total.expect("total"), 0);
}
#[tokio::test]
async fn test_email_changes_state_progression() {
let store = create_test_store();
let request1 = EmailChangesRequest {
account_id: "acc1".to_string(),
since_state: "1".to_string(),
max_changes: None,
};
let response1 = email_changes(request1, store.as_ref(), &test_principal())
.await
.expect("email_changes state_progression r1 failed");
let request2 = EmailChangesRequest {
account_id: "acc1".to_string(),
since_state: response1.new_state.clone(),
max_changes: None,
};
let response2 = email_changes(request2, store.as_ref(), &test_principal())
.await
.expect("email_changes state_progression r2 failed");
assert!(
response1.new_state.parse::<u64>().expect("parse u64 r1")
<= response2.new_state.parse::<u64>().expect("parse u64 r2")
);
}
#[tokio::test]
async fn test_email_changes_with_large_max_changes() {
let store = create_test_store();
let request = EmailChangesRequest {
account_id: "acc1".to_string(),
since_state: "100".to_string(),
max_changes: Some(10000),
};
let response = email_changes(request, store.as_ref(), &test_principal())
.await
.expect("email_changes large_max_changes failed");
assert_eq!(response.account_id, "acc1");
assert!(!response.has_more_changes);
}
#[tokio::test]
async fn test_email_query_changes_with_sort() {
let store = create_test_store();
let sort = vec![
crate::types::EmailSort {
property: "receivedAt".to_string(),
is_ascending: Some(false),
collation: None,
},
crate::types::EmailSort {
property: "subject".to_string(),
is_ascending: Some(true),
collation: Some("i;unicode-casemap".to_string()),
},
];
let request = EmailQueryChangesRequest {
account_id: "acc1".to_string(),
since_query_state: "50".to_string(),
filter: None,
sort: Some(sort),
max_changes: None,
up_to_id: None,
calculate_total: None,
};
let response = email_query_changes(request, store.as_ref(), &test_principal())
.await
.expect("email_query_changes with_sort failed");
assert_eq!(response.account_id, "acc1");
}
#[tokio::test]
async fn test_email_query_changes_with_up_to_id() {
let store = create_test_store();
let request = EmailQueryChangesRequest {
account_id: "acc1".to_string(),
since_query_state: "50".to_string(),
filter: None,
sort: None,
max_changes: Some(100),
up_to_id: Some("msg100".to_string()),
calculate_total: Some(false),
};
let response = email_query_changes(request, store.as_ref(), &test_principal())
.await
.expect("email_query_changes with_up_to_id failed");
assert_eq!(response.account_id, "acc1");
assert!(response.total.is_none());
}
#[tokio::test]
async fn test_email_changes_invalid_state() {
let store = create_test_store();
let request = EmailChangesRequest {
account_id: "acc1".to_string(),
since_state: "invalid".to_string(),
max_changes: None,
};
let result = email_changes(request, store.as_ref(), &test_principal()).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_email_query_changes_invalid_state() {
let store = create_test_store();
let request = EmailQueryChangesRequest {
account_id: "acc1".to_string(),
since_query_state: "invalid_state".to_string(),
filter: None,
sort: None,
max_changes: None,
up_to_id: None,
calculate_total: None,
};
let result = email_query_changes(request, store.as_ref(), &test_principal()).await;
assert!(result.is_err());
}
#[test]
fn test_query_changes_added() {
let previous = vec!["e1".to_string()];
let current = vec!["e1".to_string(), "e2".to_string()];
let (removed, added) = compute_query_diff(&previous, ¤t);
assert!(removed.is_empty(), "no removals expected");
assert_eq!(added.len(), 1);
assert_eq!(added[0].id, "e2");
assert_eq!(added[0].index, 1);
}
#[test]
fn test_query_changes_removed() {
let previous = vec!["e1".to_string(), "e2".to_string()];
let current = vec!["e1".to_string()];
let (removed, added) = compute_query_diff(&previous, ¤t);
assert_eq!(removed, vec!["e2".to_string()]);
assert!(added.is_empty(), "no additions expected");
}
#[tokio::test]
async fn test_query_changes_max_exceeded() {
let store = create_test_store();
let previous = vec!["old1".to_string(), "old2".to_string(), "old3".to_string()];
let current = vec!["new1".to_string(), "new2".to_string()];
let (removed, added) = compute_query_diff(&previous, ¤t);
let total = removed.len() as u64 + added.len() as u64;
assert_eq!(total, 5);
let req = EmailQueryChangesRequest {
account_id: "acc1".to_string(),
since_query_state: "0".to_string(),
filter: None,
sort: None,
max_changes: Some(0), up_to_id: None,
calculate_total: None,
};
let resp = email_query_changes(req, store.as_ref(), &test_principal())
.await
.expect("max_changes=0 with empty store should succeed (0 changes)");
assert!(resp.added.is_empty());
assert!(resp.removed.is_empty());
}
}