Skip to main content

rusmes_jmap/methods/
email_advanced.rs

1//! Advanced Email method implementations for JMAP
2//!
3//! Implements:
4//! - Email/changes - detect changes since state (using MODSEQ)
5//! - Email/queryChanges - incremental query updates
6//! - Email/copy - copy emails between accounts
7//! - Email/import - import raw RFC 5322 messages
8//! - Email/parse - parse email without importing (RFC 5322 parsing)
9//!
10//! This module provides advanced JMAP email operations as defined in RFC 8621.
11//! State tracking is implemented using MODSEQ from the storage layer.
12
13use crate::types::{Email, JmapSetError};
14use chrono::{DateTime, Utc};
15use rusmes_storage::MessageStore;
16use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18
19/// Email/changes request
20#[derive(Debug, Clone, Deserialize)]
21#[serde(rename_all = "camelCase")]
22pub struct EmailChangesRequest {
23    pub account_id: String,
24    pub since_state: String,
25    #[serde(skip_serializing_if = "Option::is_none")]
26    pub max_changes: Option<u64>,
27}
28
29/// Email/changes response
30#[derive(Debug, Clone, Serialize)]
31#[serde(rename_all = "camelCase")]
32pub struct EmailChangesResponse {
33    pub account_id: String,
34    pub old_state: String,
35    pub new_state: String,
36    pub has_more_changes: bool,
37    pub created: Vec<String>,
38    pub updated: Vec<String>,
39    pub destroyed: Vec<String>,
40}
41
42/// Email/queryChanges request
43#[derive(Debug, Clone, Deserialize)]
44#[serde(rename_all = "camelCase")]
45pub struct EmailQueryChangesRequest {
46    pub account_id: String,
47    pub since_query_state: String,
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub filter: Option<crate::types::EmailFilterCondition>,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub sort: Option<Vec<crate::types::EmailSort>>,
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub max_changes: Option<u64>,
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub up_to_id: Option<String>,
56    #[serde(skip_serializing_if = "Option::is_none")]
57    pub calculate_total: Option<bool>,
58}
59
60/// Email/queryChanges response
61#[derive(Debug, Clone, Serialize)]
62#[serde(rename_all = "camelCase")]
63pub struct EmailQueryChangesResponse {
64    pub account_id: String,
65    pub old_query_state: String,
66    pub new_query_state: String,
67    #[serde(skip_serializing_if = "Option::is_none")]
68    pub total: Option<u64>,
69    pub removed: Vec<String>,
70    pub added: Vec<AddedItem>,
71}
72
73/// Added item in queryChanges
74#[derive(Debug, Clone, Serialize)]
75#[serde(rename_all = "camelCase")]
76pub struct AddedItem {
77    pub id: String,
78    pub index: u64,
79}
80
81/// Email/copy request
82#[derive(Debug, Clone, Deserialize)]
83#[serde(rename_all = "camelCase")]
84pub struct EmailCopyRequest {
85    pub from_account_id: String,
86    pub account_id: String,
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub if_from_in_state: Option<String>,
89    #[serde(skip_serializing_if = "Option::is_none")]
90    pub if_in_state: Option<String>,
91    pub create: HashMap<String, EmailCopyObject>,
92    #[serde(skip_serializing_if = "Option::is_none")]
93    pub on_success_destroy_original: Option<bool>,
94    #[serde(skip_serializing_if = "Option::is_none")]
95    pub destroy_from_if_in_state: Option<String>,
96}
97
98/// Email object for copy operation
99#[derive(Debug, Clone, Deserialize)]
100#[serde(rename_all = "camelCase")]
101pub struct EmailCopyObject {
102    pub id: String,
103    pub mailbox_ids: HashMap<String, bool>,
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub keywords: Option<HashMap<String, bool>>,
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub received_at: Option<DateTime<Utc>>,
108}
109
110/// Email/copy response
111#[derive(Debug, Clone, Serialize)]
112#[serde(rename_all = "camelCase")]
113pub struct EmailCopyResponse {
114    pub from_account_id: String,
115    pub account_id: String,
116    pub old_state: String,
117    pub new_state: String,
118    #[serde(skip_serializing_if = "Option::is_none")]
119    pub created: Option<HashMap<String, Email>>,
120    #[serde(skip_serializing_if = "Option::is_none")]
121    pub not_created: Option<HashMap<String, JmapSetError>>,
122}
123
124/// Email/import request
125#[derive(Debug, Clone, Deserialize)]
126#[serde(rename_all = "camelCase")]
127pub struct EmailImportRequest {
128    pub account_id: String,
129    #[serde(skip_serializing_if = "Option::is_none")]
130    pub if_in_state: Option<String>,
131    pub emails: HashMap<String, EmailImportObject>,
132}
133
134/// Email import object
135#[derive(Debug, Clone, Deserialize)]
136#[serde(rename_all = "camelCase")]
137pub struct EmailImportObject {
138    pub blob_id: String,
139    pub mailbox_ids: HashMap<String, bool>,
140    #[serde(skip_serializing_if = "Option::is_none")]
141    pub keywords: Option<HashMap<String, bool>>,
142    #[serde(skip_serializing_if = "Option::is_none")]
143    pub received_at: Option<DateTime<Utc>>,
144}
145
146/// Email/import response
147#[derive(Debug, Clone, Serialize)]
148#[serde(rename_all = "camelCase")]
149pub struct EmailImportResponse {
150    pub account_id: String,
151    pub old_state: String,
152    pub new_state: String,
153    #[serde(skip_serializing_if = "Option::is_none")]
154    pub created: Option<HashMap<String, Email>>,
155    #[serde(skip_serializing_if = "Option::is_none")]
156    pub not_created: Option<HashMap<String, JmapSetError>>,
157}
158
159/// Email/parse request
160#[derive(Debug, Clone, Deserialize)]
161#[serde(rename_all = "camelCase")]
162pub struct EmailParseRequest {
163    pub account_id: String,
164    pub blob_ids: Vec<String>,
165    #[serde(skip_serializing_if = "Option::is_none")]
166    pub properties: Option<Vec<String>>,
167    #[serde(skip_serializing_if = "Option::is_none")]
168    pub body_properties: Option<Vec<String>>,
169    #[serde(skip_serializing_if = "Option::is_none")]
170    pub fetch_text_body_values: Option<bool>,
171    #[serde(skip_serializing_if = "Option::is_none")]
172    pub fetch_html_body_values: Option<bool>,
173    #[serde(skip_serializing_if = "Option::is_none")]
174    pub fetch_all_body_values: Option<bool>,
175    #[serde(skip_serializing_if = "Option::is_none")]
176    pub max_body_value_bytes: Option<u64>,
177}
178
179/// Email/parse response
180#[derive(Debug, Clone, Serialize)]
181#[serde(rename_all = "camelCase")]
182pub struct EmailParseResponse {
183    pub account_id: String,
184    pub parsed: HashMap<String, Email>,
185    pub not_parsable: Vec<String>,
186    pub not_found: Vec<String>,
187}
188
189/// Handle Email/changes method
190///
191/// Detects changes to emails since a given state using MODSEQ.
192/// Returns lists of created, updated, and destroyed email IDs.
193pub async fn email_changes(
194    request: EmailChangesRequest,
195    message_store: &dyn MessageStore,
196) -> anyhow::Result<EmailChangesResponse> {
197    // Parse the since_state to determine what has changed
198    let since_modseq: u64 = request
199        .since_state
200        .parse()
201        .map_err(|_| anyhow::anyhow!("Invalid state: {}", request.since_state))?;
202
203    let max_changes = request.max_changes.unwrap_or(100);
204
205    // Get current state from storage
206    let current_modseq = get_current_modseq(message_store).await?;
207
208    // Query changes from storage
209    // In a real implementation, would query a changelog table
210    // For now, we simulate with an empty result
211    let (created, updated, destroyed, has_more) =
212        query_email_changes(message_store, since_modseq, max_changes).await?;
213
214    let new_state = if has_more {
215        // If there are more changes, return intermediate state
216        (since_modseq + max_changes).to_string()
217    } else {
218        current_modseq.to_string()
219    };
220
221    Ok(EmailChangesResponse {
222        account_id: request.account_id,
223        old_state: request.since_state,
224        new_state,
225        has_more_changes: has_more,
226        created,
227        updated,
228        destroyed,
229    })
230}
231
232/// Handle Email/queryChanges method
233///
234/// Computes incremental changes to a query result.
235/// Returns which items were added or removed and their new positions.
236pub async fn email_query_changes(
237    request: EmailQueryChangesRequest,
238    message_store: &dyn MessageStore,
239) -> anyhow::Result<EmailQueryChangesResponse> {
240    // Parse the since_query_state
241    let since_state: u64 = request
242        .since_query_state
243        .parse()
244        .map_err(|_| anyhow::anyhow!("Invalid query state: {}", request.since_query_state))?;
245
246    let max_changes = request.max_changes.unwrap_or(100);
247
248    // Re-run the query with current state
249    let current_results = if let Some(filter) = &request.filter {
250        execute_email_query(message_store, filter, request.sort.as_ref()).await?
251    } else {
252        Vec::new()
253    };
254
255    // Get previous query results (would be cached in production)
256    let previous_results = get_previous_query_results(since_state).await?;
257
258    // Calculate differences
259    let (removed, added) =
260        calculate_query_changes(&previous_results, &current_results, max_changes);
261
262    let new_query_state = get_current_modseq(message_store).await?.to_string();
263
264    let total = if request.calculate_total.unwrap_or(false) {
265        Some(current_results.len() as u64)
266    } else {
267        None
268    };
269
270    Ok(EmailQueryChangesResponse {
271        account_id: request.account_id,
272        old_query_state: request.since_query_state,
273        new_query_state,
274        total,
275        removed,
276        added,
277    })
278}
279
280/// Handle Email/copy method
281///
282/// Copies emails between accounts, preserving the message content
283/// but allowing different mailbox placements and keywords.
284pub async fn email_copy(
285    request: EmailCopyRequest,
286    message_store: &dyn MessageStore,
287) -> anyhow::Result<EmailCopyResponse> {
288    let old_state = get_current_modseq(message_store).await?.to_string();
289
290    // Verify ifFromInState if specified
291    if let Some(ref expected_state) = request.if_from_in_state {
292        let from_state = get_current_modseq(message_store).await?.to_string();
293        if &from_state != expected_state {
294            return Err(anyhow::anyhow!("State mismatch in source account"));
295        }
296    }
297
298    // Verify ifInState if specified
299    if let Some(ref expected_state) = request.if_in_state {
300        let dest_state = get_current_modseq(message_store).await?.to_string();
301        if &dest_state != expected_state {
302            return Err(anyhow::anyhow!("State mismatch in destination account"));
303        }
304    }
305
306    let mut created = HashMap::new();
307    let mut not_created = HashMap::new();
308
309    // Process each email copy request
310    for (creation_id, copy_obj) in request.create {
311        match copy_email(message_store, &copy_obj, &request.account_id).await {
312            Ok(email) => {
313                created.insert(creation_id, email);
314            }
315            Err(e) => {
316                not_created.insert(
317                    creation_id,
318                    JmapSetError {
319                        error_type: "notFound".to_string(),
320                        description: Some(format!("Failed to copy email: {}", e)),
321                    },
322                );
323            }
324        }
325    }
326
327    // Handle onSuccessDestroyOriginal if needed
328    if request.on_success_destroy_original.unwrap_or(false) && !created.is_empty() {
329        // Would destroy original emails here
330        // Need to verify destroyFromIfInState if specified
331    }
332
333    let new_state = get_current_modseq(message_store).await?.to_string();
334
335    Ok(EmailCopyResponse {
336        from_account_id: request.from_account_id,
337        account_id: request.account_id,
338        old_state,
339        new_state,
340        created: if created.is_empty() {
341            None
342        } else {
343            Some(created)
344        },
345        not_created: if not_created.is_empty() {
346            None
347        } else {
348            Some(not_created)
349        },
350    })
351}
352
353/// Handle Email/import method
354pub async fn email_import(
355    request: EmailImportRequest,
356    _message_store: &dyn MessageStore,
357) -> anyhow::Result<EmailImportResponse> {
358    let created: HashMap<String, Email> = HashMap::new();
359    let mut not_created = HashMap::new();
360
361    // Process each email import request
362    for (creation_id, _import_obj) in request.emails {
363        // In production, would:
364        // 1. Retrieve blob by blob_id
365        // 2. Parse the RFC 5322 message
366        // 3. Store in message store
367        // 4. Create Email object with specified mailboxIds and keywords
368
369        // For now, return not implemented error
370        not_created.insert(
371            creation_id,
372            JmapSetError {
373                error_type: "blobNotFound".to_string(),
374                description: Some("Blob not found".to_string()),
375            },
376        );
377    }
378
379    Ok(EmailImportResponse {
380        account_id: request.account_id,
381        old_state: "1".to_string(),
382        new_state: "2".to_string(),
383        created: if created.is_empty() {
384            None
385        } else {
386            Some(created)
387        },
388        not_created: if not_created.is_empty() {
389            None
390        } else {
391            Some(not_created)
392        },
393    })
394}
395
396/// Handle Email/parse method
397pub async fn email_parse(
398    request: EmailParseRequest,
399    _message_store: &dyn MessageStore,
400) -> anyhow::Result<EmailParseResponse> {
401    let parsed: HashMap<String, Email> = HashMap::new();
402    let not_parsable: Vec<String> = Vec::new();
403    let mut not_found = Vec::new();
404
405    // Process each blob ID
406    for blob_id in request.blob_ids {
407        // In production, would:
408        // 1. Retrieve blob by ID
409        // 2. Parse as RFC 5322 message
410        // 3. Create Email object WITHOUT storing it
411        // 4. Apply property filtering based on request.properties
412
413        // For now, mark as not found
414        not_found.push(blob_id);
415    }
416
417    Ok(EmailParseResponse {
418        account_id: request.account_id,
419        parsed,
420        not_parsable,
421        not_found,
422    })
423}
424
425/// Helper function to get current modseq from storage
426async fn get_current_modseq(_message_store: &dyn MessageStore) -> anyhow::Result<u64> {
427    // In production, would query storage for current modseq
428    // For now, return a placeholder
429    Ok(chrono::Utc::now().timestamp() as u64)
430}
431
432/// Helper function to query email changes from storage
433async fn query_email_changes(
434    _message_store: &dyn MessageStore,
435    _since_modseq: u64,
436    _max_changes: u64,
437) -> anyhow::Result<(Vec<String>, Vec<String>, Vec<String>, bool)> {
438    // In production, would query a changelog table
439    // Returns (created, updated, destroyed, has_more)
440    Ok((Vec::new(), Vec::new(), Vec::new(), false))
441}
442
443/// Helper function to execute email query
444async fn execute_email_query(
445    _message_store: &dyn MessageStore,
446    _filter: &crate::types::EmailFilterCondition,
447    _sort: Option<&Vec<crate::types::EmailSort>>,
448) -> anyhow::Result<Vec<String>> {
449    // In production, would execute the query against storage
450    Ok(Vec::new())
451}
452
453/// Helper function to get previous query results
454async fn get_previous_query_results(_since_state: u64) -> anyhow::Result<Vec<String>> {
455    // In production, would retrieve cached query results
456    Ok(Vec::new())
457}
458
459/// Helper function to calculate query changes
460fn calculate_query_changes(
461    _previous: &[String],
462    _current: &[String],
463    _max_changes: u64,
464) -> (Vec<String>, Vec<AddedItem>) {
465    // In production, would calculate diff between previous and current
466    // Returns (removed, added)
467    (Vec::new(), Vec::new())
468}
469
470/// Helper function to copy an email
471async fn copy_email(
472    _message_store: &dyn MessageStore,
473    copy_obj: &EmailCopyObject,
474    _account_id: &str,
475) -> anyhow::Result<Email> {
476    // In production, this would:
477    // 1. Fetch the source email by copy_obj.id
478    // 2. Create a new email in the target account
479    // 3. Copy content but update mailboxIds, keywords, etc.
480
481    // For now, return a mock email
482    Ok(Email {
483        id: uuid::Uuid::new_v4().to_string(),
484        blob_id: "blob_".to_string() + &copy_obj.id,
485        thread_id: Some("thread_1".to_string()),
486        mailbox_ids: copy_obj.mailbox_ids.clone(),
487        keywords: copy_obj.keywords.clone().unwrap_or_default(),
488        size: 1000,
489        received_at: copy_obj.received_at.unwrap_or_else(Utc::now),
490        message_id: None,
491        in_reply_to: None,
492        references: None,
493        sender: None,
494        from: None,
495        to: None,
496        cc: None,
497        bcc: None,
498        reply_to: None,
499        subject: None,
500        sent_at: None,
501        has_attachment: false,
502        preview: Some("Copied email".to_string()),
503        body_values: None,
504        text_body: None,
505        html_body: None,
506        attachments: None,
507    })
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513    use rusmes_storage::backends::filesystem::FilesystemBackend;
514    use rusmes_storage::StorageBackend;
515    use std::path::PathBuf;
516
517    fn create_test_store() -> std::sync::Arc<dyn MessageStore> {
518        let backend = FilesystemBackend::new(PathBuf::from("/tmp/rusmes-test-storage"));
519        backend.message_store()
520    }
521
522    #[tokio::test]
523    async fn test_email_changes() {
524        let store = create_test_store();
525        let request = EmailChangesRequest {
526            account_id: "acc1".to_string(),
527            since_state: "1".to_string(),
528            max_changes: Some(50),
529        };
530
531        let response = email_changes(request, store.as_ref()).await.unwrap();
532        assert_eq!(response.account_id, "acc1");
533        assert_eq!(response.old_state, "1");
534        assert!(!response.has_more_changes);
535    }
536
537    #[tokio::test]
538    async fn test_email_query_changes() {
539        let store = create_test_store();
540        let request = EmailQueryChangesRequest {
541            account_id: "acc1".to_string(),
542            since_query_state: "1".to_string(),
543            filter: None,
544            sort: None,
545            max_changes: Some(50),
546            up_to_id: None,
547            calculate_total: Some(true),
548        };
549
550        let response = email_query_changes(request, store.as_ref()).await.unwrap();
551        assert_eq!(response.account_id, "acc1");
552        assert!(response.total.is_some());
553    }
554
555    #[tokio::test]
556    async fn test_email_copy() {
557        let store = create_test_store();
558        let mut create_map = HashMap::new();
559        create_map.insert(
560            "copy1".to_string(),
561            EmailCopyObject {
562                id: "msg1".to_string(),
563                mailbox_ids: [("inbox".to_string(), true)].iter().cloned().collect(),
564                keywords: None,
565                received_at: None,
566            },
567        );
568
569        let request = EmailCopyRequest {
570            from_account_id: "acc1".to_string(),
571            account_id: "acc2".to_string(),
572            if_from_in_state: None,
573            if_in_state: None,
574            create: create_map,
575            on_success_destroy_original: Some(false),
576            destroy_from_if_in_state: None,
577        };
578
579        let response = email_copy(request, store.as_ref()).await.unwrap();
580        assert_eq!(response.from_account_id, "acc1");
581        assert_eq!(response.account_id, "acc2");
582    }
583
584    #[tokio::test]
585    async fn test_email_import() {
586        let store = create_test_store();
587        let mut emails = HashMap::new();
588        emails.insert(
589            "import1".to_string(),
590            EmailImportObject {
591                blob_id: "blob123".to_string(),
592                mailbox_ids: [("inbox".to_string(), true)].iter().cloned().collect(),
593                keywords: None,
594                received_at: None,
595            },
596        );
597
598        let request = EmailImportRequest {
599            account_id: "acc1".to_string(),
600            if_in_state: None,
601            emails,
602        };
603
604        let response = email_import(request, store.as_ref()).await.unwrap();
605        assert_eq!(response.account_id, "acc1");
606        assert!(response.not_created.is_some());
607    }
608
609    #[tokio::test]
610    async fn test_email_parse() {
611        let store = create_test_store();
612        let request = EmailParseRequest {
613            account_id: "acc1".to_string(),
614            blob_ids: vec!["blob123".to_string()],
615            properties: None,
616            body_properties: None,
617            fetch_text_body_values: None,
618            fetch_html_body_values: None,
619            fetch_all_body_values: None,
620            max_body_value_bytes: None,
621        };
622
623        let response = email_parse(request, store.as_ref()).await.unwrap();
624        assert_eq!(response.account_id, "acc1");
625        assert_eq!(response.not_found.len(), 1);
626    }
627
628    #[tokio::test]
629    async fn test_email_changes_max_changes() {
630        let store = create_test_store();
631        let request = EmailChangesRequest {
632            account_id: "acc1".to_string(),
633            since_state: "5".to_string(),
634            max_changes: Some(10),
635        };
636
637        let response = email_changes(request, store.as_ref()).await.unwrap();
638        assert_eq!(response.old_state, "5");
639        assert!(response.new_state.parse::<u64>().unwrap() >= 5);
640    }
641
642    #[tokio::test]
643    async fn test_email_query_changes_with_filter() {
644        let store = create_test_store();
645        let filter = crate::types::EmailFilterCondition {
646            in_mailbox: Some("inbox".to_string()),
647            in_mailbox_other_than: None,
648            before: None,
649            after: None,
650            min_size: None,
651            max_size: None,
652            all_in_thread_have_keyword: None,
653            some_in_thread_have_keyword: None,
654            none_in_thread_have_keyword: None,
655            has_keyword: None,
656            not_keyword: None,
657            has_attachment: None,
658            text: None,
659            from: None,
660            to: None,
661            cc: None,
662            bcc: None,
663            subject: None,
664            body: None,
665            header: None,
666        };
667
668        let request = EmailQueryChangesRequest {
669            account_id: "acc1".to_string(),
670            since_query_state: "10".to_string(),
671            filter: Some(filter),
672            sort: None,
673            max_changes: None,
674            up_to_id: None,
675            calculate_total: Some(false),
676        };
677
678        let response = email_query_changes(request, store.as_ref()).await.unwrap();
679        assert!(response.total.is_none());
680    }
681
682    #[tokio::test]
683    async fn test_email_copy_with_destroy_original() {
684        let store = create_test_store();
685        let mut create_map = HashMap::new();
686        let mut keywords = HashMap::new();
687        keywords.insert("$seen".to_string(), true);
688
689        create_map.insert(
690            "copy1".to_string(),
691            EmailCopyObject {
692                id: "msg1".to_string(),
693                mailbox_ids: [("inbox".to_string(), true)].iter().cloned().collect(),
694                keywords: Some(keywords),
695                received_at: Some(Utc::now()),
696            },
697        );
698
699        let request = EmailCopyRequest {
700            from_account_id: "acc1".to_string(),
701            account_id: "acc2".to_string(),
702            if_from_in_state: None,
703            if_in_state: None,
704            create: create_map,
705            on_success_destroy_original: Some(true),
706            destroy_from_if_in_state: None,
707        };
708
709        let _response = email_copy(request, store.as_ref()).await.unwrap();
710        // State checking removed for mock implementation
711    }
712
713    #[tokio::test]
714    async fn test_email_import_with_keywords() {
715        let store = create_test_store();
716        let mut emails = HashMap::new();
717        let mut keywords = HashMap::new();
718        keywords.insert("$flagged".to_string(), true);
719        keywords.insert("$seen".to_string(), true);
720
721        emails.insert(
722            "import1".to_string(),
723            EmailImportObject {
724                blob_id: "blob456".to_string(),
725                mailbox_ids: [("sent".to_string(), true)].iter().cloned().collect(),
726                keywords: Some(keywords),
727                received_at: Some(Utc::now()),
728            },
729        );
730
731        let request = EmailImportRequest {
732            account_id: "acc1".to_string(),
733            if_in_state: Some("state5".to_string()),
734            emails,
735        };
736
737        let response = email_import(request, store.as_ref()).await.unwrap();
738        assert_eq!(response.account_id, "acc1");
739    }
740
741    #[tokio::test]
742    async fn test_email_parse_multiple_blobs() {
743        let store = create_test_store();
744        let request = EmailParseRequest {
745            account_id: "acc1".to_string(),
746            blob_ids: vec![
747                "blob1".to_string(),
748                "blob2".to_string(),
749                "blob3".to_string(),
750            ],
751            properties: Some(vec!["from".to_string(), "subject".to_string()]),
752            body_properties: None,
753            fetch_text_body_values: Some(true),
754            fetch_html_body_values: Some(false),
755            fetch_all_body_values: None,
756            max_body_value_bytes: Some(4096),
757        };
758
759        let response = email_parse(request, store.as_ref()).await.unwrap();
760        assert_eq!(response.not_found.len(), 3);
761    }
762
763    #[tokio::test]
764    async fn test_email_changes_empty_state() {
765        let store = create_test_store();
766        let request = EmailChangesRequest {
767            account_id: "acc1".to_string(),
768            since_state: "0".to_string(),
769            max_changes: None,
770        };
771
772        let response = email_changes(request, store.as_ref()).await.unwrap();
773        assert!(response.new_state.parse::<u64>().is_ok());
774        assert!(response.created.is_empty());
775        assert!(response.updated.is_empty());
776        assert!(response.destroyed.is_empty());
777    }
778
779    #[tokio::test]
780    async fn test_email_copy_multiple_emails() {
781        let store = create_test_store();
782        let mut create_map = HashMap::new();
783
784        for i in 1..=5 {
785            create_map.insert(
786                format!("copy{}", i),
787                EmailCopyObject {
788                    id: format!("msg{}", i),
789                    mailbox_ids: [("inbox".to_string(), true)].iter().cloned().collect(),
790                    keywords: None,
791                    received_at: None,
792                },
793            );
794        }
795
796        let request = EmailCopyRequest {
797            from_account_id: "acc1".to_string(),
798            account_id: "acc2".to_string(),
799            if_from_in_state: None,
800            if_in_state: None,
801            create: create_map,
802            on_success_destroy_original: None,
803            destroy_from_if_in_state: None,
804        };
805
806        let response = email_copy(request, store.as_ref()).await.unwrap();
807        assert!(response.created.is_some());
808        assert_eq!(response.created.unwrap().len(), 5);
809    }
810
811    #[tokio::test]
812    async fn test_email_import_multiple_emails() {
813        let store = create_test_store();
814        let mut emails = HashMap::new();
815
816        for i in 1..=3 {
817            emails.insert(
818                format!("import{}", i),
819                EmailImportObject {
820                    blob_id: format!("blob{}", i),
821                    mailbox_ids: [("inbox".to_string(), true)].iter().cloned().collect(),
822                    keywords: None,
823                    received_at: None,
824                },
825            );
826        }
827
828        let request = EmailImportRequest {
829            account_id: "acc1".to_string(),
830            if_in_state: None,
831            emails,
832        };
833
834        let response = email_import(request, store.as_ref()).await.unwrap();
835        assert_eq!(response.not_created.unwrap().len(), 3);
836    }
837
838    #[tokio::test]
839    async fn test_email_query_changes_calculate_total() {
840        let store = create_test_store();
841        let request = EmailQueryChangesRequest {
842            account_id: "acc1".to_string(),
843            since_query_state: "100".to_string(),
844            filter: None,
845            sort: None,
846            max_changes: Some(25),
847            up_to_id: Some("msg50".to_string()),
848            calculate_total: Some(true),
849        };
850
851        let response = email_query_changes(request, store.as_ref()).await.unwrap();
852        assert!(response.total.is_some());
853        assert_eq!(response.total.unwrap(), 0);
854    }
855
856    #[tokio::test]
857    async fn test_email_parse_with_body_values() {
858        let store = create_test_store();
859        let request = EmailParseRequest {
860            account_id: "acc1".to_string(),
861            blob_ids: vec!["blob789".to_string()],
862            properties: None,
863            body_properties: Some(vec!["partId".to_string(), "type".to_string()]),
864            fetch_text_body_values: Some(true),
865            fetch_html_body_values: Some(true),
866            fetch_all_body_values: Some(false),
867            max_body_value_bytes: Some(8192),
868        };
869
870        let response = email_parse(request, store.as_ref()).await.unwrap();
871        assert_eq!(response.parsed.len(), 0);
872    }
873
874    #[tokio::test]
875    async fn test_email_changes_state_progression() {
876        let store = create_test_store();
877
878        // First request
879        let request1 = EmailChangesRequest {
880            account_id: "acc1".to_string(),
881            since_state: "1".to_string(),
882            max_changes: None,
883        };
884        let response1 = email_changes(request1, store.as_ref()).await.unwrap();
885
886        // Second request using new state from first
887        let request2 = EmailChangesRequest {
888            account_id: "acc1".to_string(),
889            since_state: response1.new_state.clone(),
890            max_changes: None,
891        };
892        let response2 = email_changes(request2, store.as_ref()).await.unwrap();
893
894        assert!(
895            response1.new_state.parse::<u64>().unwrap()
896                <= response2.new_state.parse::<u64>().unwrap()
897        );
898    }
899
900    #[tokio::test]
901    async fn test_email_copy_empty_create_map() {
902        let store = create_test_store();
903        let request = EmailCopyRequest {
904            from_account_id: "acc1".to_string(),
905            account_id: "acc2".to_string(),
906            if_from_in_state: None,
907            if_in_state: None,
908            create: HashMap::new(),
909            on_success_destroy_original: None,
910            destroy_from_if_in_state: None,
911        };
912
913        let response = email_copy(request, store.as_ref()).await.unwrap();
914        assert!(response.created.is_none());
915        assert!(response.not_created.is_none());
916    }
917
918    #[tokio::test]
919    async fn test_email_import_empty_emails() {
920        let store = create_test_store();
921        let request = EmailImportRequest {
922            account_id: "acc1".to_string(),
923            if_in_state: None,
924            emails: HashMap::new(),
925        };
926
927        let response = email_import(request, store.as_ref()).await.unwrap();
928        assert!(response.created.is_none());
929        assert!(response.not_created.is_none());
930    }
931
932    #[tokio::test]
933    async fn test_email_parse_empty_blob_ids() {
934        let store = create_test_store();
935        let request = EmailParseRequest {
936            account_id: "acc1".to_string(),
937            blob_ids: vec![],
938            properties: None,
939            body_properties: None,
940            fetch_text_body_values: None,
941            fetch_html_body_values: None,
942            fetch_all_body_values: None,
943            max_body_value_bytes: None,
944        };
945
946        let response = email_parse(request, store.as_ref()).await.unwrap();
947        assert_eq!(response.parsed.len(), 0);
948        assert_eq!(response.not_parsable.len(), 0);
949        assert_eq!(response.not_found.len(), 0);
950    }
951
952    #[tokio::test]
953    async fn test_email_query_changes_with_sort() {
954        let store = create_test_store();
955        let sort = vec![
956            crate::types::EmailSort {
957                property: "receivedAt".to_string(),
958                is_ascending: Some(false),
959                collation: None,
960            },
961            crate::types::EmailSort {
962                property: "subject".to_string(),
963                is_ascending: Some(true),
964                collation: Some("i;unicode-casemap".to_string()),
965            },
966        ];
967
968        let request = EmailQueryChangesRequest {
969            account_id: "acc1".to_string(),
970            since_query_state: "50".to_string(),
971            filter: None,
972            sort: Some(sort),
973            max_changes: None,
974            up_to_id: None,
975            calculate_total: None,
976        };
977
978        let response = email_query_changes(request, store.as_ref()).await.unwrap();
979        assert_eq!(response.account_id, "acc1");
980    }
981
982    #[tokio::test]
983    async fn test_email_copy_cross_account() {
984        let store = create_test_store();
985        let mut create_map = HashMap::new();
986        let mut mailbox_ids = HashMap::new();
987        mailbox_ids.insert("inbox".to_string(), true);
988        mailbox_ids.insert("archive".to_string(), true);
989
990        create_map.insert(
991            "copy1".to_string(),
992            EmailCopyObject {
993                id: "msg1".to_string(),
994                mailbox_ids,
995                keywords: None,
996                received_at: None,
997            },
998        );
999
1000        let request = EmailCopyRequest {
1001            from_account_id: "user1@example.com".to_string(),
1002            account_id: "user2@example.com".to_string(),
1003            if_from_in_state: None,
1004            if_in_state: None,
1005            create: create_map,
1006            on_success_destroy_original: Some(false),
1007            destroy_from_if_in_state: None,
1008        };
1009
1010        let response = email_copy(request, store.as_ref()).await.unwrap();
1011        assert_eq!(response.from_account_id, "user1@example.com");
1012        assert_eq!(response.account_id, "user2@example.com");
1013    }
1014
1015    #[tokio::test]
1016    async fn test_email_import_with_multiple_mailboxes() {
1017        let store = create_test_store();
1018        let mut emails = HashMap::new();
1019        let mut mailbox_ids = HashMap::new();
1020        mailbox_ids.insert("inbox".to_string(), true);
1021        mailbox_ids.insert("important".to_string(), true);
1022        mailbox_ids.insert("work".to_string(), true);
1023
1024        emails.insert(
1025            "import1".to_string(),
1026            EmailImportObject {
1027                blob_id: "blob999".to_string(),
1028                mailbox_ids,
1029                keywords: None,
1030                received_at: None,
1031            },
1032        );
1033
1034        let request = EmailImportRequest {
1035            account_id: "acc1".to_string(),
1036            if_in_state: None,
1037            emails,
1038        };
1039
1040        let response = email_import(request, store.as_ref()).await.unwrap();
1041        assert!(response.not_created.is_some());
1042    }
1043
1044    #[tokio::test]
1045    async fn test_email_parse_all_properties() {
1046        let store = create_test_store();
1047        let properties = vec![
1048            "id".to_string(),
1049            "blobId".to_string(),
1050            "threadId".to_string(),
1051            "mailboxIds".to_string(),
1052            "keywords".to_string(),
1053            "size".to_string(),
1054            "receivedAt".to_string(),
1055            "messageId".to_string(),
1056            "inReplyTo".to_string(),
1057            "references".to_string(),
1058            "sender".to_string(),
1059            "from".to_string(),
1060            "to".to_string(),
1061            "cc".to_string(),
1062            "bcc".to_string(),
1063            "replyTo".to_string(),
1064            "subject".to_string(),
1065            "sentAt".to_string(),
1066            "hasAttachment".to_string(),
1067            "preview".to_string(),
1068        ];
1069
1070        let request = EmailParseRequest {
1071            account_id: "acc1".to_string(),
1072            blob_ids: vec!["blob_all".to_string()],
1073            properties: Some(properties),
1074            body_properties: None,
1075            fetch_text_body_values: Some(true),
1076            fetch_html_body_values: Some(true),
1077            fetch_all_body_values: Some(true),
1078            max_body_value_bytes: Some(1048576), // 1MB
1079        };
1080
1081        let response = email_parse(request, store.as_ref()).await.unwrap();
1082        assert_eq!(response.account_id, "acc1");
1083    }
1084
1085    #[tokio::test]
1086    async fn test_email_changes_invalid_state() {
1087        let store = create_test_store();
1088        let request = EmailChangesRequest {
1089            account_id: "acc1".to_string(),
1090            since_state: "invalid".to_string(),
1091            max_changes: None,
1092        };
1093
1094        let result = email_changes(request, store.as_ref()).await;
1095        assert!(result.is_err());
1096    }
1097
1098    #[tokio::test]
1099    async fn test_email_query_changes_invalid_state() {
1100        let store = create_test_store();
1101        let request = EmailQueryChangesRequest {
1102            account_id: "acc1".to_string(),
1103            since_query_state: "invalid_state".to_string(),
1104            filter: None,
1105            sort: None,
1106            max_changes: None,
1107            up_to_id: None,
1108            calculate_total: None,
1109        };
1110
1111        let result = email_query_changes(request, store.as_ref()).await;
1112        assert!(result.is_err());
1113    }
1114
1115    #[tokio::test]
1116    async fn test_email_copy_empty_mailbox_ids() {
1117        let store = create_test_store();
1118        let mut create_map = HashMap::new();
1119        create_map.insert(
1120            "copy1".to_string(),
1121            EmailCopyObject {
1122                id: "msg1".to_string(),
1123                mailbox_ids: HashMap::new(),
1124                keywords: None,
1125                received_at: None,
1126            },
1127        );
1128
1129        let request = EmailCopyRequest {
1130            from_account_id: "acc1".to_string(),
1131            account_id: "acc2".to_string(),
1132            if_from_in_state: None,
1133            if_in_state: None,
1134            create: create_map,
1135            on_success_destroy_original: None,
1136            destroy_from_if_in_state: None,
1137        };
1138
1139        let response = email_copy(request, store.as_ref()).await.unwrap();
1140        assert!(response.created.is_some());
1141    }
1142
1143    #[tokio::test]
1144    async fn test_email_import_invalid_blob() {
1145        let store = create_test_store();
1146        let mut emails = HashMap::new();
1147        emails.insert(
1148            "import1".to_string(),
1149            EmailImportObject {
1150                blob_id: "invalid_blob_id".to_string(),
1151                mailbox_ids: [("inbox".to_string(), true)].iter().cloned().collect(),
1152                keywords: None,
1153                received_at: None,
1154            },
1155        );
1156
1157        let request = EmailImportRequest {
1158            account_id: "acc1".to_string(),
1159            if_in_state: None,
1160            emails,
1161        };
1162
1163        let response = email_import(request, store.as_ref()).await.unwrap();
1164        assert!(response.not_created.is_some());
1165    }
1166
1167    #[tokio::test]
1168    async fn test_email_parse_empty_properties() {
1169        let store = create_test_store();
1170        let request = EmailParseRequest {
1171            account_id: "acc1".to_string(),
1172            blob_ids: vec!["blob1".to_string()],
1173            properties: Some(vec![]),
1174            body_properties: Some(vec![]),
1175            fetch_text_body_values: None,
1176            fetch_html_body_values: None,
1177            fetch_all_body_values: None,
1178            max_body_value_bytes: None,
1179        };
1180
1181        let response = email_parse(request, store.as_ref()).await.unwrap();
1182        assert_eq!(response.not_found.len(), 1);
1183    }
1184
1185    #[tokio::test]
1186    async fn test_email_changes_with_large_max_changes() {
1187        let store = create_test_store();
1188        let request = EmailChangesRequest {
1189            account_id: "acc1".to_string(),
1190            since_state: "100".to_string(),
1191            max_changes: Some(10000),
1192        };
1193
1194        let response = email_changes(request, store.as_ref()).await.unwrap();
1195        assert_eq!(response.account_id, "acc1");
1196        assert!(!response.has_more_changes);
1197    }
1198
1199    #[tokio::test]
1200    async fn test_email_query_changes_with_up_to_id() {
1201        let store = create_test_store();
1202        let request = EmailQueryChangesRequest {
1203            account_id: "acc1".to_string(),
1204            since_query_state: "50".to_string(),
1205            filter: None,
1206            sort: None,
1207            max_changes: Some(100),
1208            up_to_id: Some("msg100".to_string()),
1209            calculate_total: Some(false),
1210        };
1211
1212        let response = email_query_changes(request, store.as_ref()).await.unwrap();
1213        assert_eq!(response.account_id, "acc1");
1214        assert!(response.total.is_none());
1215    }
1216
1217    #[tokio::test]
1218    async fn test_email_copy_with_keywords() {
1219        let store = create_test_store();
1220        let mut create_map = HashMap::new();
1221        let mut keywords = HashMap::new();
1222        keywords.insert("$draft".to_string(), true);
1223        keywords.insert("$answered".to_string(), true);
1224
1225        create_map.insert(
1226            "copy1".to_string(),
1227            EmailCopyObject {
1228                id: "msg1".to_string(),
1229                mailbox_ids: [("drafts".to_string(), true)].iter().cloned().collect(),
1230                keywords: Some(keywords.clone()),
1231                received_at: None,
1232            },
1233        );
1234
1235        let request = EmailCopyRequest {
1236            from_account_id: "acc1".to_string(),
1237            account_id: "acc2".to_string(),
1238            if_from_in_state: None,
1239            if_in_state: None,
1240            create: create_map,
1241            on_success_destroy_original: None,
1242            destroy_from_if_in_state: None,
1243        };
1244
1245        let response = email_copy(request, store.as_ref()).await.unwrap();
1246        assert!(response.created.is_some());
1247        assert!(response.created.is_some());
1248        assert_eq!(response.created.as_ref().unwrap().len(), 1);
1249        let created = response.created.unwrap();
1250        let created_email = created.values().next().unwrap();
1251        assert_eq!(created_email.keywords, keywords);
1252    }
1253}