1use axum::{
2 extract::ws::{Message as WebSocketMessage, WebSocket, WebSocketUpgrade},
3 extract::{Path as AxumPath, Query, State},
4 http::HeaderMap,
5 http::StatusCode,
6 response::{IntoResponse, Response},
7 routing::{get, post},
8 Json, Router,
9};
10use chrono::{DateTime, Datelike, Local, Utc};
11use futures::{SinkExt, StreamExt};
12use mxr_compose::{
13 frontmatter::{parse_compose_file, render_compose_file, ComposeFrontmatter},
14 parse::parse_address_list,
15 render::render_markdown,
16 validate_draft, ComposeKind, ComposeValidation,
17};
18use mxr_config::load_config;
19use mxr_core::{
20 id::LabelId,
21 id::{AccountId, DraftId, MessageId, ThreadId},
22 types::{
23 Draft, Envelope, Label, LabelKind, MessageBody, MessageFlags, ReplyHeaders, SavedSearch,
24 SearchMode, SortOrder, SubscriptionSummary,
25 },
26};
27use mxr_protocol::{IpcCodec, IpcMessage, IpcPayload, Request, ResponseData, SearchResultItem};
28use serde::{Deserialize, Serialize};
29use serde_json::json;
30use std::collections::{HashMap, HashSet};
31use std::net::SocketAddr;
32use std::path::{Path, PathBuf};
33use tokio::net::TcpListener;
34use tokio::net::UnixStream;
35use tokio_util::codec::Framed;
36use tower_http::cors::CorsLayer;
37use uuid::Uuid;
38
39#[derive(Debug, Clone)]
40pub struct WebServerConfig {
41 pub socket_path: PathBuf,
42 pub auth_token: String,
43}
44
45impl WebServerConfig {
46 pub fn new(socket_path: PathBuf, auth_token: String) -> Self {
47 Self {
48 socket_path,
49 auth_token,
50 }
51 }
52}
53
54pub fn app(_config: WebServerConfig) -> Router {
55 Router::new()
56 .route("/status", get(status))
57 .route("/shell", get(shell))
58 .route("/mailbox", get(mailbox))
59 .route("/search", get(search))
60 .route("/thread/{thread_id}", get(thread))
61 .route("/thread/{thread_id}/export", get(export_thread))
62 .route("/compose/session", post(start_compose_session))
63 .route("/compose/session/refresh", post(refresh_compose_session))
64 .route("/compose/session/update", post(update_compose_session))
65 .route("/compose/session/send", post(send_compose_session))
66 .route("/compose/session/save", post(save_compose_session))
67 .route("/compose/session/discard", post(discard_compose_session))
68 .route("/rules", get(rules))
69 .route("/rules/detail", get(rule_detail))
70 .route("/rules/form", get(rule_form))
71 .route("/rules/history", get(rule_history))
72 .route("/rules/dry-run", get(rule_dry_run))
73 .route("/rules/upsert", post(upsert_rule))
74 .route("/rules/upsert-form", post(upsert_rule_form))
75 .route("/rules/delete", post(delete_rule))
76 .route("/accounts", get(accounts))
77 .route("/accounts/test", post(test_account))
78 .route("/accounts/upsert", post(upsert_account))
79 .route("/accounts/default", post(set_default_account))
80 .route("/diagnostics", get(diagnostics))
81 .route("/diagnostics/bug-report", get(generate_bug_report))
82 .route("/mutations/archive", post(archive))
83 .route("/mutations/trash", post(trash))
84 .route("/mutations/spam", post(spam))
85 .route("/mutations/star", post(star))
86 .route("/mutations/read", post(mark_read))
87 .route("/mutations/read-and-archive", post(mark_read_and_archive))
88 .route("/mutations/labels", post(modify_labels))
89 .route("/mutations/move", post(move_messages))
90 .route("/actions/snooze/presets", get(snooze_presets))
91 .route("/actions/snooze", post(snooze))
92 .route("/actions/unsubscribe", post(unsubscribe))
93 .route("/attachments/open", post(open_attachment))
94 .route("/attachments/download", post(download_attachment))
95 .route("/events", get(events))
96 .with_state(AppState::new(_config))
97 .layer(CorsLayer::permissive())
98}
99
100pub async fn serve(listener: TcpListener, config: WebServerConfig) -> std::io::Result<()> {
101 axum::serve(listener, app(config)).await
102}
103
104pub async fn bind_and_serve(
105 host: std::net::IpAddr,
106 port: u16,
107 config: WebServerConfig,
108) -> std::io::Result<SocketAddr> {
109 let listener = TcpListener::bind((host, port)).await?;
110 let addr = listener.local_addr()?;
111 tokio::spawn(async move {
112 let _ = serve(listener, config).await;
113 });
114 Ok(addr)
115}
116
117#[derive(Clone)]
118struct AppState {
119 config: WebServerConfig,
120}
121
122impl AppState {
123 fn new(config: WebServerConfig) -> Self {
124 Self { config }
125 }
126}
127
128#[derive(Debug, thiserror::Error)]
129enum BridgeError {
130 #[error("failed to connect to mxr daemon at {0}")]
131 Connect(String),
132 #[error("ipc error: {0}")]
133 Ipc(String),
134 #[error("unauthorized")]
135 Unauthorized,
136 #[error("unexpected response from daemon")]
137 UnexpectedResponse,
138}
139
140impl IntoResponse for BridgeError {
141 fn into_response(self) -> Response {
142 let status = match self {
143 Self::Unauthorized => StatusCode::UNAUTHORIZED,
144 _ => StatusCode::BAD_GATEWAY,
145 };
146 (
147 status,
148 Json(serde_json::json!({ "error": self.to_string() })),
149 )
150 .into_response()
151 }
152}
153
154#[derive(Debug, Default, Deserialize)]
155struct AuthQuery {
156 #[serde(default)]
157 token: Option<String>,
158}
159
160fn ensure_authorized(
161 headers: &HeaderMap,
162 query_token: Option<&str>,
163 expected_token: &str,
164) -> Result<(), BridgeError> {
165 let header_token = headers
166 .get("x-mxr-bridge-token")
167 .and_then(|value| value.to_str().ok());
168 let provided = header_token.or(query_token);
169 if provided == Some(expected_token) {
170 return Ok(());
171 }
172 Err(BridgeError::Unauthorized)
173}
174
175async fn status(
176 State(state): State<AppState>,
177 headers: HeaderMap,
178 Query(auth): Query<AuthQuery>,
179) -> Result<Json<serde_json::Value>, BridgeError> {
180 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
181 match ipc_request(&state.config.socket_path, Request::GetStatus).await? {
182 ResponseData::Status {
183 uptime_secs,
184 accounts,
185 total_messages,
186 daemon_pid,
187 sync_statuses,
188 protocol_version,
189 daemon_version,
190 daemon_build_id,
191 repair_required,
192 } => Ok(Json(serde_json::json!({
193 "uptime_secs": uptime_secs,
194 "accounts": accounts,
195 "total_messages": total_messages,
196 "daemon_pid": daemon_pid,
197 "sync_statuses": sync_statuses,
198 "protocol_version": protocol_version,
199 "daemon_version": daemon_version,
200 "daemon_build_id": daemon_build_id,
201 "repair_required": repair_required,
202 }))),
203 _ => Err(BridgeError::UnexpectedResponse),
204 }
205}
206
207async fn shell(
208 State(state): State<AppState>,
209 headers: HeaderMap,
210 Query(auth): Query<AuthQuery>,
211) -> Result<Json<serde_json::Value>, BridgeError> {
212 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
213 let lens = MailboxLensRequest::default();
214 let chrome = build_bridge_chrome(&state.config.socket_path, &lens).await?;
215 Ok(Json(json!({
216 "shell": chrome.shell,
217 "sidebar": chrome.sidebar,
218 })))
219}
220
221#[derive(Debug, Default, Deserialize)]
222struct MailboxQuery {
223 #[serde(default = "default_limit")]
224 limit: u32,
225 #[serde(default)]
226 offset: u32,
227 #[serde(default)]
228 lens_kind: MailboxLensKind,
229 #[serde(default)]
230 label_id: Option<String>,
231 #[serde(default)]
232 saved_search: Option<String>,
233 #[serde(default)]
234 sender_email: Option<String>,
235 #[serde(default)]
236 token: Option<String>,
237}
238
239#[derive(Debug, Clone, Default, Deserialize, PartialEq, Eq)]
240#[serde(rename_all = "snake_case")]
241enum MailboxLensKind {
242 #[default]
243 Inbox,
244 AllMail,
245 Label,
246 SavedSearch,
247 Subscription,
248}
249
250#[derive(Debug, Clone, Default, PartialEq, Eq)]
251struct MailboxLensRequest {
252 kind: MailboxLensKind,
253 label_id: Option<String>,
254 saved_search: Option<String>,
255 sender_email: Option<String>,
256}
257
258impl MailboxQuery {
259 fn lens(&self) -> MailboxLensRequest {
260 MailboxLensRequest {
261 kind: self.lens_kind.clone(),
262 label_id: self.label_id.clone(),
263 saved_search: self.saved_search.clone(),
264 sender_email: self.sender_email.clone(),
265 }
266 }
267}
268
269fn default_limit() -> u32 {
270 200
271}
272
273#[derive(Debug, Deserialize)]
274struct SearchQuery {
275 #[serde(default)]
276 q: String,
277 #[serde(default = "default_limit")]
278 limit: u32,
279 #[serde(default)]
280 mode: Option<SearchMode>,
281 #[serde(default)]
282 scope: Option<String>,
283 #[serde(default)]
284 sort: Option<String>,
285 #[serde(default)]
286 explain: bool,
287 #[serde(default)]
288 token: Option<String>,
289}
290
291#[derive(Debug)]
292struct BridgeChrome {
293 shell: serde_json::Value,
294 sidebar: serde_json::Value,
295 labels: Vec<Label>,
296 inbox_label_id: Option<mxr_core::LabelId>,
297 searches: Vec<SavedSearch>,
298 subscriptions: Vec<SubscriptionSummary>,
299}
300
301#[derive(Debug, Serialize)]
302struct MessageRowView {
303 id: String,
304 thread_id: String,
305 provider_id: String,
306 sender: String,
307 sender_detail: Option<String>,
308 subject: String,
309 snippet: String,
310 date_label: String,
311 unread: bool,
312 starred: bool,
313 has_attachments: bool,
314}
315
316#[derive(Debug, Serialize)]
317struct MessageGroupView {
318 id: String,
319 label: String,
320 rows: Vec<MessageRowView>,
321}
322
323#[derive(Debug, Deserialize)]
324struct MessageIdsRequest {
325 message_ids: Vec<String>,
326}
327
328#[derive(Debug, Deserialize)]
329struct StarRequest {
330 message_ids: Vec<String>,
331 starred: bool,
332}
333
334#[derive(Debug, Deserialize)]
335struct ReadRequest {
336 message_ids: Vec<String>,
337 read: bool,
338}
339
340#[derive(Debug, Deserialize)]
341#[serde(rename_all = "snake_case")]
342enum ComposeSessionKindRequest {
343 New,
344 Reply,
345 ReplyAll,
346 Forward,
347}
348
349#[derive(Debug, Deserialize)]
350struct ComposeSessionStartRequest {
351 kind: ComposeSessionKindRequest,
352 #[serde(default)]
353 message_id: Option<String>,
354 #[serde(default)]
355 to: Option<String>,
356}
357
358#[derive(Debug, Deserialize)]
359struct ComposeSessionPathRequest {
360 draft_path: String,
361}
362
363#[derive(Debug, Deserialize)]
364struct ComposeSessionUpdateRequest {
365 draft_path: String,
366 to: String,
367 cc: String,
368 bcc: String,
369 subject: String,
370 from: String,
371 #[serde(default)]
372 attach: Vec<String>,
373}
374
375#[derive(Debug, Deserialize)]
376struct ComposeSessionSendRequest {
377 draft_path: String,
378 account_id: String,
379}
380
381#[derive(Debug, Deserialize)]
382struct ModifyLabelsRequest {
383 message_ids: Vec<String>,
384 #[serde(default)]
385 add: Vec<String>,
386 #[serde(default)]
387 remove: Vec<String>,
388}
389
390#[derive(Debug, Deserialize)]
391struct MoveRequest {
392 message_ids: Vec<String>,
393 target_label: String,
394}
395
396#[derive(Debug, Deserialize)]
397struct RuleQuery {
398 rule: String,
399 #[serde(default)]
400 token: Option<String>,
401}
402
403#[derive(Debug, Deserialize)]
404struct DeleteRuleRequest {
405 rule: String,
406}
407
408#[derive(Debug, Deserialize)]
409struct UpsertRuleRequest {
410 rule: serde_json::Value,
411}
412
413#[derive(Debug, Deserialize)]
414struct UpsertRuleFormRequest {
415 existing_rule: Option<String>,
416 name: String,
417 condition: String,
418 action: String,
419 priority: i32,
420 enabled: bool,
421}
422
423#[derive(Debug, Deserialize)]
424struct SetDefaultAccountRequest {
425 key: String,
426}
427
428#[derive(Debug, Deserialize)]
429struct AttachmentRequest {
430 message_id: String,
431 attachment_id: String,
432}
433
434#[derive(Debug, Deserialize)]
435struct UnsubscribeRequest {
436 message_id: String,
437}
438
439#[derive(Debug, Deserialize)]
440struct SnoozeRequest {
441 message_id: String,
442 until: String,
443}
444
445#[derive(Debug, Clone, Serialize)]
446struct ComposeIssueView {
447 severity: &'static str,
448 message: String,
449}
450
451struct MailboxSelection {
452 lens_label: String,
453 counts: serde_json::Value,
454 envelopes: Vec<Envelope>,
455}
456
457async fn mailbox(
458 State(state): State<AppState>,
459 headers: HeaderMap,
460 Query(query): Query<MailboxQuery>,
461) -> Result<Json<serde_json::Value>, BridgeError> {
462 ensure_authorized(&headers, query.token.as_deref(), &state.config.auth_token)?;
463 let lens = query.lens();
464 let chrome = build_bridge_chrome(&state.config.socket_path, &lens).await?;
465 let mailbox = load_mailbox_selection(
466 &state.config.socket_path,
467 &chrome,
468 &lens,
469 query.limit,
470 query.offset,
471 )
472 .await?;
473 Ok(Json(json!({
474 "shell": chrome.shell,
475 "sidebar": chrome.sidebar,
476 "mailbox": {
477 "lensLabel": mailbox.lens_label,
478 "counts": mailbox.counts,
479 "groups": group_envelopes(mailbox.envelopes),
480 }
481 })))
482}
483
484async fn thread(
485 State(state): State<AppState>,
486 headers: HeaderMap,
487 Query(auth): Query<AuthQuery>,
488 AxumPath(thread_id): AxumPath<String>,
489) -> Result<Json<serde_json::Value>, BridgeError> {
490 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
491 let thread_id = parse_thread_id(&thread_id)?;
492 match ipc_request(&state.config.socket_path, Request::GetThread { thread_id }).await? {
493 ResponseData::Thread { thread, messages } => {
494 let bodies = match ipc_request(
495 &state.config.socket_path,
496 Request::ListBodies {
497 message_ids: messages
498 .iter()
499 .map(|message| message.id.clone())
500 .collect::<Vec<MessageId>>(),
501 },
502 )
503 .await?
504 {
505 ResponseData::Bodies { bodies } => bodies,
506 _ => return Err(BridgeError::UnexpectedResponse),
507 };
508
509 let attachment_count = bodies
510 .iter()
511 .map(|body| body.attachments.len())
512 .sum::<usize>();
513
514 Ok(Json(json!({
515 "thread": thread,
516 "messages": messages.iter().map(message_row_view).collect::<Vec<_>>(),
517 "bodies": bodies,
518 "reader_mode": thread_reader_mode(&bodies),
519 "right_rail": {
520 "title": "Thread context",
521 "items": [
522 format!("{} messages", thread.message_count),
523 format!("{} unread", thread.unread_count),
524 format!("{} participants", thread.participants.len()),
525 if attachment_count == 0 {
526 "No attachments".to_string()
527 } else {
528 format!("{attachment_count} attachments")
529 }
530 ],
531 }
532 })))
533 }
534 _ => Err(BridgeError::UnexpectedResponse),
535 }
536}
537
538async fn export_thread(
539 State(state): State<AppState>,
540 headers: HeaderMap,
541 Query(auth): Query<AuthQuery>,
542 AxumPath(thread_id): AxumPath<String>,
543) -> Result<Json<serde_json::Value>, BridgeError> {
544 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
545 match ipc_request(
546 &state.config.socket_path,
547 Request::ExportThread {
548 thread_id: parse_thread_id(&thread_id)?,
549 format: mxr_core::types::ExportFormat::Markdown,
550 },
551 )
552 .await?
553 {
554 ResponseData::ExportResult { content } => Ok(Json(json!({ "content": content }))),
555 _ => Err(BridgeError::UnexpectedResponse),
556 }
557}
558
559async fn search(
560 State(state): State<AppState>,
561 headers: HeaderMap,
562 Query(query): Query<SearchQuery>,
563) -> Result<Json<serde_json::Value>, BridgeError> {
564 ensure_authorized(&headers, query.token.as_deref(), &state.config.auth_token)?;
565 if query.q.trim().is_empty() {
566 return Ok(Json(json!({
567 "scope": query.scope.unwrap_or_else(|| "threads".to_string()),
568 "sort": query.sort.unwrap_or_else(|| "recent".to_string()),
569 "mode": query.mode.unwrap_or_default(),
570 "total": 0,
571 "has_more": false,
572 "groups": [],
573 "explain": serde_json::Value::Null,
574 })));
575 }
576
577 let sort = match query.sort.as_deref() {
578 Some("relevant") => SortOrder::Relevance,
579 Some("oldest") => SortOrder::DateAsc,
580 _ => SortOrder::DateDesc,
581 };
582
583 let thread_scope = query.scope.as_deref().unwrap_or("threads") == "threads";
584
585 match ipc_request(
586 &state.config.socket_path,
587 Request::Search {
588 query: query.q,
589 limit: query.limit,
590 offset: 0,
591 mode: query.mode,
592 sort: Some(sort),
593 explain: query.explain,
594 },
595 )
596 .await?
597 {
598 ResponseData::SearchResults {
599 results,
600 explain,
601 has_more,
602 } => {
603 let effective_results = if thread_scope {
604 dedupe_search_results_by_thread(results)
605 } else {
606 results
607 };
608 let message_ids = effective_results
609 .iter()
610 .map(|result| result.message_id.clone())
611 .collect::<Vec<_>>();
612 let envelopes = if message_ids.is_empty() {
613 Vec::new()
614 } else {
615 match ipc_request(
616 &state.config.socket_path,
617 Request::ListEnvelopesByIds {
618 message_ids: message_ids.clone(),
619 },
620 )
621 .await?
622 {
623 ResponseData::Envelopes { envelopes } => {
624 reorder_envelopes(envelopes, &message_ids)
625 }
626 _ => return Err(BridgeError::UnexpectedResponse),
627 }
628 };
629
630 Ok(Json(json!({
631 "scope": query.scope.unwrap_or_else(|| "threads".to_string()),
632 "sort": query.sort.unwrap_or_else(|| "recent".to_string()),
633 "mode": query.mode.unwrap_or_default(),
634 "total": effective_results.len(),
635 "has_more": has_more,
636 "groups": group_envelopes(envelopes),
637 "explain": explain,
638 })))
639 }
640 _ => Err(BridgeError::UnexpectedResponse),
641 }
642}
643
644async fn start_compose_session(
645 State(state): State<AppState>,
646 headers: HeaderMap,
647 Query(auth): Query<AuthQuery>,
648 Json(request): Json<ComposeSessionStartRequest>,
649) -> Result<Json<serde_json::Value>, BridgeError> {
650 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
651 let session = create_compose_session(&state.config.socket_path, request).await?;
652 Ok(Json(json!({ "session": session })))
653}
654
655async fn refresh_compose_session(
656 State(state): State<AppState>,
657 headers: HeaderMap,
658 Query(auth): Query<AuthQuery>,
659 Json(request): Json<ComposeSessionPathRequest>,
660) -> Result<Json<serde_json::Value>, BridgeError> {
661 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
662 let session = load_compose_session(Path::new(&request.draft_path))?;
663 Ok(Json(json!({ "session": session })))
664}
665
666async fn update_compose_session(
667 State(state): State<AppState>,
668 headers: HeaderMap,
669 Query(auth): Query<AuthQuery>,
670 Json(request): Json<ComposeSessionUpdateRequest>,
671) -> Result<Json<serde_json::Value>, BridgeError> {
672 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
673 let path = Path::new(&request.draft_path);
674 let content =
675 std::fs::read_to_string(path).map_err(|error| BridgeError::Ipc(error.to_string()))?;
676 let (_existing_frontmatter, body) =
677 parse_compose_file(&content).map_err(|error| BridgeError::Ipc(error.to_string()))?;
678 let context = extract_compose_context(&content);
679 let updated = ComposeFrontmatter {
680 to: request.to,
681 cc: request.cc,
682 bcc: request.bcc,
683 subject: request.subject,
684 from: request.from,
685 in_reply_to: extract_in_reply_to(&content)?,
686 references: extract_references(&content)?,
687 attach: request.attach,
688 };
689 let rendered = render_compose_file(&updated, &body, context.as_deref())
690 .map_err(|error| BridgeError::Ipc(error.to_string()))?;
691 std::fs::write(path, rendered).map_err(|error| BridgeError::Ipc(error.to_string()))?;
692 let session = load_compose_session(path)?;
693 Ok(Json(json!({ "session": session })))
694}
695
696async fn send_compose_session(
697 State(state): State<AppState>,
698 headers: HeaderMap,
699 Query(auth): Query<AuthQuery>,
700 Json(request): Json<ComposeSessionSendRequest>,
701) -> Result<Json<serde_json::Value>, BridgeError> {
702 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
703 let draft = compose_draft_from_file(&request.draft_path, &request.account_id)?;
704 let _ = ack_request(&state.config.socket_path, Request::SendDraft { draft }).await?;
705 let _ = std::fs::remove_file(&request.draft_path);
706 Ok(Json(json!({ "ok": true })))
707}
708
709async fn save_compose_session(
710 State(state): State<AppState>,
711 headers: HeaderMap,
712 Query(auth): Query<AuthQuery>,
713 Json(request): Json<ComposeSessionSendRequest>,
714) -> Result<Json<serde_json::Value>, BridgeError> {
715 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
716 let draft = compose_draft_from_file(&request.draft_path, &request.account_id)?;
717 let _ = ack_request(
718 &state.config.socket_path,
719 Request::SaveDraftToServer { draft },
720 )
721 .await?;
722 Ok(Json(json!({ "ok": true })))
723}
724
725async fn discard_compose_session(
726 State(state): State<AppState>,
727 headers: HeaderMap,
728 Query(auth): Query<AuthQuery>,
729 Json(request): Json<ComposeSessionPathRequest>,
730) -> Result<Json<serde_json::Value>, BridgeError> {
731 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
732 let _ = std::fs::remove_file(&request.draft_path);
733 Ok(Json(json!({ "ok": true })))
734}
735
736async fn snooze_presets(
737 State(state): State<AppState>,
738 headers: HeaderMap,
739 Query(auth): Query<AuthQuery>,
740) -> Result<Json<serde_json::Value>, BridgeError> {
741 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
742 let config = load_config().unwrap_or_default().snooze;
743 let presets = [
744 build_snooze_preset("tomorrow", "Tomorrow morning", &config),
745 build_snooze_preset("tonight", "Tonight", &config),
746 build_snooze_preset("weekend", "Weekend", &config),
747 build_snooze_preset("monday", "Next Monday", &config),
748 ];
749 Ok(Json(json!({ "presets": presets })))
750}
751
752async fn snooze(
753 State(state): State<AppState>,
754 headers: HeaderMap,
755 Query(auth): Query<AuthQuery>,
756 Json(request): Json<SnoozeRequest>,
757) -> Result<Json<serde_json::Value>, BridgeError> {
758 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
759 let config = load_config().unwrap_or_default().snooze;
760 let wake_at = resolve_snooze_until(&request.until, &config)?;
761 let _ = ack_request(
762 &state.config.socket_path,
763 Request::Snooze {
764 message_id: parse_message_id(&request.message_id)?,
765 wake_at,
766 },
767 )
768 .await?;
769 Ok(Json(json!({ "ok": true, "wake_at": wake_at })))
770}
771
772async fn unsubscribe(
773 State(state): State<AppState>,
774 headers: HeaderMap,
775 Query(auth): Query<AuthQuery>,
776 Json(request): Json<UnsubscribeRequest>,
777) -> Result<Json<serde_json::Value>, BridgeError> {
778 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
779 let _ = ack_request(
780 &state.config.socket_path,
781 Request::Unsubscribe {
782 message_id: parse_message_id(&request.message_id)?,
783 },
784 )
785 .await?;
786 Ok(Json(json!({ "ok": true })))
787}
788
789async fn open_attachment(
790 State(state): State<AppState>,
791 headers: HeaderMap,
792 Query(auth): Query<AuthQuery>,
793 Json(request): Json<AttachmentRequest>,
794) -> Result<Json<serde_json::Value>, BridgeError> {
795 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
796 match ipc_request(
797 &state.config.socket_path,
798 Request::OpenAttachment {
799 message_id: parse_message_id(&request.message_id)?,
800 attachment_id: parse_attachment_id(&request.attachment_id)?,
801 },
802 )
803 .await?
804 {
805 ResponseData::AttachmentFile { file } => Ok(Json(json!({ "file": file }))),
806 _ => Err(BridgeError::UnexpectedResponse),
807 }
808}
809
810async fn download_attachment(
811 State(state): State<AppState>,
812 headers: HeaderMap,
813 Query(auth): Query<AuthQuery>,
814 Json(request): Json<AttachmentRequest>,
815) -> Result<Json<serde_json::Value>, BridgeError> {
816 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
817 match ipc_request(
818 &state.config.socket_path,
819 Request::DownloadAttachment {
820 message_id: parse_message_id(&request.message_id)?,
821 attachment_id: parse_attachment_id(&request.attachment_id)?,
822 },
823 )
824 .await?
825 {
826 ResponseData::AttachmentFile { file } => Ok(Json(json!({ "file": file }))),
827 _ => Err(BridgeError::UnexpectedResponse),
828 }
829}
830
831async fn rules(
832 State(state): State<AppState>,
833 headers: HeaderMap,
834 Query(auth): Query<AuthQuery>,
835) -> Result<Json<serde_json::Value>, BridgeError> {
836 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
837 match ipc_request(&state.config.socket_path, Request::ListRules).await? {
838 ResponseData::Rules { rules } => Ok(Json(json!({ "rules": rules }))),
839 _ => Err(BridgeError::UnexpectedResponse),
840 }
841}
842
843async fn rule_detail(
844 State(state): State<AppState>,
845 headers: HeaderMap,
846 Query(query): Query<RuleQuery>,
847) -> Result<Json<serde_json::Value>, BridgeError> {
848 ensure_authorized(&headers, query.token.as_deref(), &state.config.auth_token)?;
849 match ipc_request(
850 &state.config.socket_path,
851 Request::GetRule {
852 rule: query.rule.clone(),
853 },
854 )
855 .await?
856 {
857 ResponseData::RuleData { rule } => Ok(Json(json!({ "rule": rule }))),
858 _ => Err(BridgeError::UnexpectedResponse),
859 }
860}
861
862async fn rule_form(
863 State(state): State<AppState>,
864 headers: HeaderMap,
865 Query(query): Query<RuleQuery>,
866) -> Result<Json<serde_json::Value>, BridgeError> {
867 ensure_authorized(&headers, query.token.as_deref(), &state.config.auth_token)?;
868 match ipc_request(
869 &state.config.socket_path,
870 Request::GetRuleForm {
871 rule: query.rule.clone(),
872 },
873 )
874 .await?
875 {
876 ResponseData::RuleFormData { form } => Ok(Json(json!({ "form": form }))),
877 _ => Err(BridgeError::UnexpectedResponse),
878 }
879}
880
881async fn rule_history(
882 State(state): State<AppState>,
883 headers: HeaderMap,
884 Query(query): Query<RuleQuery>,
885) -> Result<Json<serde_json::Value>, BridgeError> {
886 ensure_authorized(&headers, query.token.as_deref(), &state.config.auth_token)?;
887 match ipc_request(
888 &state.config.socket_path,
889 Request::ListRuleHistory {
890 rule: Some(query.rule.clone()),
891 limit: 20,
892 },
893 )
894 .await?
895 {
896 ResponseData::RuleHistory { entries } => Ok(Json(json!({ "entries": entries }))),
897 _ => Err(BridgeError::UnexpectedResponse),
898 }
899}
900
901async fn rule_dry_run(
902 State(state): State<AppState>,
903 headers: HeaderMap,
904 Query(query): Query<RuleQuery>,
905) -> Result<Json<serde_json::Value>, BridgeError> {
906 ensure_authorized(&headers, query.token.as_deref(), &state.config.auth_token)?;
907 match ipc_request(
908 &state.config.socket_path,
909 Request::DryRunRules {
910 rule: Some(query.rule.clone()),
911 all: false,
912 after: None,
913 },
914 )
915 .await?
916 {
917 ResponseData::RuleDryRun { results } => Ok(Json(json!({ "results": results }))),
918 _ => Err(BridgeError::UnexpectedResponse),
919 }
920}
921
922async fn upsert_rule(
923 State(state): State<AppState>,
924 headers: HeaderMap,
925 Query(auth): Query<AuthQuery>,
926 Json(request): Json<UpsertRuleRequest>,
927) -> Result<Json<serde_json::Value>, BridgeError> {
928 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
929 match ipc_request(
930 &state.config.socket_path,
931 Request::UpsertRule { rule: request.rule },
932 )
933 .await?
934 {
935 ResponseData::RuleData { rule } => Ok(Json(json!({ "rule": rule }))),
936 _ => Err(BridgeError::UnexpectedResponse),
937 }
938}
939
940async fn upsert_rule_form(
941 State(state): State<AppState>,
942 headers: HeaderMap,
943 Query(auth): Query<AuthQuery>,
944 Json(request): Json<UpsertRuleFormRequest>,
945) -> Result<Json<serde_json::Value>, BridgeError> {
946 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
947 match ipc_request(
948 &state.config.socket_path,
949 Request::UpsertRuleForm {
950 existing_rule: request.existing_rule,
951 name: request.name,
952 condition: request.condition,
953 action: request.action,
954 priority: request.priority,
955 enabled: request.enabled,
956 },
957 )
958 .await?
959 {
960 ResponseData::RuleData { rule } => Ok(Json(json!({ "rule": rule }))),
961 _ => Err(BridgeError::UnexpectedResponse),
962 }
963}
964
965async fn delete_rule(
966 State(state): State<AppState>,
967 headers: HeaderMap,
968 Query(auth): Query<AuthQuery>,
969 Json(request): Json<DeleteRuleRequest>,
970) -> Result<Json<serde_json::Value>, BridgeError> {
971 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
972 let _ = ack_request(
973 &state.config.socket_path,
974 Request::DeleteRule { rule: request.rule },
975 )
976 .await?;
977 Ok(Json(json!({ "ok": true })))
978}
979
980async fn accounts(
981 State(state): State<AppState>,
982 headers: HeaderMap,
983 Query(auth): Query<AuthQuery>,
984) -> Result<Json<serde_json::Value>, BridgeError> {
985 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
986 match ipc_request(&state.config.socket_path, Request::ListAccounts).await? {
987 ResponseData::Accounts { accounts } => Ok(Json(json!({ "accounts": accounts }))),
988 _ => Err(BridgeError::UnexpectedResponse),
989 }
990}
991
992async fn test_account(
993 State(state): State<AppState>,
994 headers: HeaderMap,
995 Query(auth): Query<AuthQuery>,
996 Json(account): Json<mxr_protocol::AccountConfigData>,
997) -> Result<Json<serde_json::Value>, BridgeError> {
998 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
999 match ipc_request(
1000 &state.config.socket_path,
1001 Request::TestAccountConfig { account },
1002 )
1003 .await?
1004 {
1005 ResponseData::AccountOperation { result } => Ok(Json(json!({ "result": result }))),
1006 _ => Err(BridgeError::UnexpectedResponse),
1007 }
1008}
1009
1010async fn upsert_account(
1011 State(state): State<AppState>,
1012 headers: HeaderMap,
1013 Query(auth): Query<AuthQuery>,
1014 Json(account): Json<mxr_protocol::AccountConfigData>,
1015) -> Result<Json<serde_json::Value>, BridgeError> {
1016 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
1017 let result = run_account_save_workflow(&state.config.socket_path, account).await?;
1018 Ok(Json(json!({ "result": result })))
1019}
1020
1021async fn set_default_account(
1022 State(state): State<AppState>,
1023 headers: HeaderMap,
1024 Query(auth): Query<AuthQuery>,
1025 Json(request): Json<SetDefaultAccountRequest>,
1026) -> Result<Json<serde_json::Value>, BridgeError> {
1027 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
1028 match ipc_request(
1029 &state.config.socket_path,
1030 Request::SetDefaultAccount { key: request.key },
1031 )
1032 .await?
1033 {
1034 ResponseData::AccountOperation { result } => Ok(Json(json!({ "result": result }))),
1035 _ => Err(BridgeError::UnexpectedResponse),
1036 }
1037}
1038
1039async fn diagnostics(
1040 State(state): State<AppState>,
1041 headers: HeaderMap,
1042 Query(auth): Query<AuthQuery>,
1043) -> Result<Json<serde_json::Value>, BridgeError> {
1044 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
1045 match ipc_request(&state.config.socket_path, Request::GetDoctorReport).await? {
1046 ResponseData::DoctorReport { report } => Ok(Json(json!({ "report": report }))),
1047 _ => Err(BridgeError::UnexpectedResponse),
1048 }
1049}
1050
1051async fn generate_bug_report(
1052 State(state): State<AppState>,
1053 headers: HeaderMap,
1054 Query(auth): Query<AuthQuery>,
1055) -> Result<Json<serde_json::Value>, BridgeError> {
1056 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
1057 match ipc_request(
1058 &state.config.socket_path,
1059 Request::GenerateBugReport {
1060 verbose: false,
1061 full_logs: false,
1062 since: None,
1063 },
1064 )
1065 .await?
1066 {
1067 ResponseData::BugReport { content } => Ok(Json(json!({ "content": content }))),
1068 _ => Err(BridgeError::UnexpectedResponse),
1069 }
1070}
1071
1072async fn archive(
1073 State(state): State<AppState>,
1074 headers: HeaderMap,
1075 Query(auth): Query<AuthQuery>,
1076 Json(request): Json<MessageIdsRequest>,
1077) -> Result<Json<serde_json::Value>, BridgeError> {
1078 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
1079 ack_mutation(
1080 &state.config.socket_path,
1081 mxr_protocol::MutationCommand::Archive {
1082 message_ids: parse_message_ids(&request.message_ids)?,
1083 },
1084 )
1085 .await
1086}
1087
1088async fn trash(
1089 State(state): State<AppState>,
1090 headers: HeaderMap,
1091 Query(auth): Query<AuthQuery>,
1092 Json(request): Json<MessageIdsRequest>,
1093) -> Result<Json<serde_json::Value>, BridgeError> {
1094 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
1095 ack_mutation(
1096 &state.config.socket_path,
1097 mxr_protocol::MutationCommand::Trash {
1098 message_ids: parse_message_ids(&request.message_ids)?,
1099 },
1100 )
1101 .await
1102}
1103
1104async fn spam(
1105 State(state): State<AppState>,
1106 headers: HeaderMap,
1107 Query(auth): Query<AuthQuery>,
1108 Json(request): Json<MessageIdsRequest>,
1109) -> Result<Json<serde_json::Value>, BridgeError> {
1110 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
1111 ack_mutation(
1112 &state.config.socket_path,
1113 mxr_protocol::MutationCommand::Spam {
1114 message_ids: parse_message_ids(&request.message_ids)?,
1115 },
1116 )
1117 .await
1118}
1119
1120async fn star(
1121 State(state): State<AppState>,
1122 headers: HeaderMap,
1123 Query(auth): Query<AuthQuery>,
1124 Json(request): Json<StarRequest>,
1125) -> Result<Json<serde_json::Value>, BridgeError> {
1126 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
1127 ack_mutation(
1128 &state.config.socket_path,
1129 mxr_protocol::MutationCommand::Star {
1130 message_ids: parse_message_ids(&request.message_ids)?,
1131 starred: request.starred,
1132 },
1133 )
1134 .await
1135}
1136
1137async fn mark_read(
1138 State(state): State<AppState>,
1139 headers: HeaderMap,
1140 Query(auth): Query<AuthQuery>,
1141 Json(request): Json<ReadRequest>,
1142) -> Result<Json<serde_json::Value>, BridgeError> {
1143 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
1144 ack_mutation(
1145 &state.config.socket_path,
1146 mxr_protocol::MutationCommand::SetRead {
1147 message_ids: parse_message_ids(&request.message_ids)?,
1148 read: request.read,
1149 },
1150 )
1151 .await
1152}
1153
1154async fn mark_read_and_archive(
1155 State(state): State<AppState>,
1156 headers: HeaderMap,
1157 Query(auth): Query<AuthQuery>,
1158 Json(request): Json<MessageIdsRequest>,
1159) -> Result<Json<serde_json::Value>, BridgeError> {
1160 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
1161 ack_mutation(
1162 &state.config.socket_path,
1163 mxr_protocol::MutationCommand::ReadAndArchive {
1164 message_ids: parse_message_ids(&request.message_ids)?,
1165 },
1166 )
1167 .await
1168}
1169
1170async fn modify_labels(
1171 State(state): State<AppState>,
1172 headers: HeaderMap,
1173 Query(auth): Query<AuthQuery>,
1174 Json(request): Json<ModifyLabelsRequest>,
1175) -> Result<Json<serde_json::Value>, BridgeError> {
1176 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
1177 ack_mutation(
1178 &state.config.socket_path,
1179 mxr_protocol::MutationCommand::ModifyLabels {
1180 message_ids: parse_message_ids(&request.message_ids)?,
1181 add: request.add,
1182 remove: request.remove,
1183 },
1184 )
1185 .await
1186}
1187
1188async fn move_messages(
1189 State(state): State<AppState>,
1190 headers: HeaderMap,
1191 Query(auth): Query<AuthQuery>,
1192 Json(request): Json<MoveRequest>,
1193) -> Result<Json<serde_json::Value>, BridgeError> {
1194 ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)?;
1195 ack_mutation(
1196 &state.config.socket_path,
1197 mxr_protocol::MutationCommand::Move {
1198 message_ids: parse_message_ids(&request.message_ids)?,
1199 target_label: request.target_label,
1200 },
1201 )
1202 .await
1203}
1204
1205async fn events(
1206 ws: WebSocketUpgrade,
1207 State(state): State<AppState>,
1208 headers: HeaderMap,
1209 Query(auth): Query<AuthQuery>,
1210) -> impl IntoResponse {
1211 if let Err(error) = ensure_authorized(&headers, auth.token.as_deref(), &state.config.auth_token)
1212 {
1213 return error.into_response();
1214 }
1215 ws.on_upgrade(move |socket| bridge_events(socket, state.config.socket_path))
1216}
1217
1218async fn ipc_request(socket_path: &Path, request: Request) -> Result<ResponseData, BridgeError> {
1219 let stream = UnixStream::connect(socket_path)
1220 .await
1221 .map_err(|error| BridgeError::Connect(error.to_string()))?;
1222 let mut framed = Framed::new(stream, IpcCodec::new());
1223 let message = IpcMessage {
1224 id: 1,
1225 payload: IpcPayload::Request(request),
1226 };
1227 framed
1228 .send(message)
1229 .await
1230 .map_err(|error| BridgeError::Ipc(error.to_string()))?;
1231
1232 loop {
1233 match framed.next().await {
1234 Some(Ok(response)) => match response.payload {
1235 IpcPayload::Response(mxr_protocol::Response::Ok { data }) => return Ok(data),
1236 IpcPayload::Response(mxr_protocol::Response::Error { message }) => {
1237 return Err(BridgeError::Ipc(message));
1238 }
1239 IpcPayload::Event(_) => continue,
1240 _ => return Err(BridgeError::UnexpectedResponse),
1241 },
1242 Some(Err(error)) => return Err(BridgeError::Ipc(error.to_string())),
1243 None => return Err(BridgeError::Ipc("connection closed".into())),
1244 }
1245 }
1246}
1247
1248async fn bridge_events(mut socket: WebSocket, socket_path: PathBuf) {
1249 let stream = match UnixStream::connect(&socket_path).await {
1250 Ok(stream) => stream,
1251 Err(error) => {
1252 let _ = socket
1253 .send(WebSocketMessage::Text(
1254 serde_json::json!({ "error": error.to_string() })
1255 .to_string()
1256 .into(),
1257 ))
1258 .await;
1259 return;
1260 }
1261 };
1262 let mut framed = Framed::new(stream, IpcCodec::new());
1263
1264 loop {
1265 match framed.next().await {
1266 Some(Ok(message)) => match message.payload {
1267 IpcPayload::Event(event) => {
1268 let payload = match serde_json::to_string(&event) {
1269 Ok(payload) => payload,
1270 Err(_) => break,
1271 };
1272 if socket
1273 .send(WebSocketMessage::Text(payload.into()))
1274 .await
1275 .is_err()
1276 {
1277 break;
1278 }
1279 }
1280 _ => continue,
1281 },
1282 Some(Err(_)) | None => break,
1283 }
1284 }
1285}
1286
1287fn parse_thread_id(value: &str) -> Result<ThreadId, BridgeError> {
1288 Uuid::parse_str(value)
1289 .map(ThreadId::from_uuid)
1290 .map_err(|_| BridgeError::Ipc(format!("invalid thread id: {value}")))
1291}
1292
1293fn parse_message_id(value: &str) -> Result<MessageId, BridgeError> {
1294 Uuid::parse_str(value)
1295 .map(MessageId::from_uuid)
1296 .map_err(|_| BridgeError::Ipc(format!("invalid message id: {value}")))
1297}
1298
1299fn parse_attachment_id(value: &str) -> Result<mxr_core::AttachmentId, BridgeError> {
1300 Uuid::parse_str(value)
1301 .map(mxr_core::AttachmentId::from_uuid)
1302 .map_err(|_| BridgeError::Ipc(format!("invalid attachment id: {value}")))
1303}
1304
1305fn parse_message_ids(values: &[String]) -> Result<Vec<MessageId>, BridgeError> {
1306 values
1307 .iter()
1308 .map(|value| parse_message_id(value))
1309 .collect::<Result<Vec<_>, _>>()
1310}
1311
1312fn parse_account_id(value: &str) -> Result<AccountId, BridgeError> {
1313 Uuid::parse_str(value)
1314 .map(AccountId::from_uuid)
1315 .map_err(|_| BridgeError::Ipc(format!("invalid account id: {value}")))
1316}
1317
1318fn parse_label_id(value: &str) -> Result<LabelId, BridgeError> {
1319 Uuid::parse_str(value)
1320 .map(LabelId::from_uuid)
1321 .map_err(|_| BridgeError::Ipc(format!("invalid label id: {value}")))
1322}
1323
1324async fn create_compose_session(
1325 socket_path: &Path,
1326 request: ComposeSessionStartRequest,
1327) -> Result<serde_json::Value, BridgeError> {
1328 let (account_id, from) = default_account(socket_path).await?;
1329 let (kind, account_id, cursor_line) = match request.kind {
1330 ComposeSessionKindRequest::New => (
1331 request
1332 .to
1333 .map(|to| ComposeKind::NewWithTo { to })
1334 .unwrap_or(ComposeKind::New),
1335 account_id,
1336 None::<usize>,
1337 ),
1338 ComposeSessionKindRequest::Reply | ComposeSessionKindRequest::ReplyAll => {
1339 let message_id = request
1340 .message_id
1341 .as_deref()
1342 .ok_or_else(|| BridgeError::Ipc("compose reply missing message_id".into()))?;
1343 let envelope = envelope_for_message(socket_path, message_id).await?;
1344 let response = ipc_request(
1345 socket_path,
1346 Request::PrepareReply {
1347 message_id: envelope.id.clone(),
1348 reply_all: matches!(request.kind, ComposeSessionKindRequest::ReplyAll),
1349 },
1350 )
1351 .await?;
1352 let context = match response {
1353 ResponseData::ReplyContext { context } => context,
1354 _ => return Err(BridgeError::UnexpectedResponse),
1355 };
1356 (
1357 ComposeKind::Reply {
1358 in_reply_to: context.in_reply_to,
1359 references: context.references,
1360 to: context.reply_to,
1361 cc: context.cc,
1362 subject: context.subject,
1363 thread_context: context.thread_context,
1364 },
1365 envelope.account_id,
1366 None,
1367 )
1368 }
1369 ComposeSessionKindRequest::Forward => {
1370 let message_id = request
1371 .message_id
1372 .as_deref()
1373 .ok_or_else(|| BridgeError::Ipc("compose forward missing message_id".into()))?;
1374 let envelope = envelope_for_message(socket_path, message_id).await?;
1375 let response = ipc_request(
1376 socket_path,
1377 Request::PrepareForward {
1378 message_id: envelope.id.clone(),
1379 },
1380 )
1381 .await?;
1382 let context = match response {
1383 ResponseData::ForwardContext { context } => context,
1384 _ => return Err(BridgeError::UnexpectedResponse),
1385 };
1386 (
1387 ComposeKind::Forward {
1388 subject: context.subject,
1389 original_context: context.forwarded_content,
1390 },
1391 envelope.account_id,
1392 None,
1393 )
1394 }
1395 };
1396
1397 let account = account_summary(socket_path, &account_id).await?;
1398 let compose_from = if from.trim().is_empty() {
1399 account.email.clone()
1400 } else {
1401 from
1402 };
1403 let (draft_path, resolved_cursor_line) = mxr_compose::create_draft_file(kind, &compose_from)
1404 .map_err(|error| BridgeError::Ipc(error.to_string()))?;
1405 let mut session = load_compose_session(&draft_path)?;
1406 if let Some(cursor_line) = cursor_line {
1407 session["cursorLine"] = json!(cursor_line);
1408 } else {
1409 session["cursorLine"] = json!(resolved_cursor_line);
1410 }
1411 session["accountId"] = json!(account.account_id);
1412 session["kind"] = json!(compose_kind_name(&request.kind));
1413 session["editorCommand"] = json!(resolved_editor_command());
1414 Ok(session)
1415}
1416
1417fn compose_kind_name(kind: &ComposeSessionKindRequest) -> &'static str {
1418 match kind {
1419 ComposeSessionKindRequest::New => "new",
1420 ComposeSessionKindRequest::Reply => "reply",
1421 ComposeSessionKindRequest::ReplyAll => "reply_all",
1422 ComposeSessionKindRequest::Forward => "forward",
1423 }
1424}
1425
1426fn load_compose_session(path: &Path) -> Result<serde_json::Value, BridgeError> {
1427 let raw_content =
1428 std::fs::read_to_string(path).map_err(|error| BridgeError::Ipc(error.to_string()))?;
1429 let (frontmatter, body) =
1430 parse_compose_file(&raw_content).map_err(|error| BridgeError::Ipc(error.to_string()))?;
1431 let rendered = render_markdown(&body);
1432 let issues = validate_draft(&frontmatter, &body)
1433 .into_iter()
1434 .map(compose_issue_view)
1435 .collect::<Vec<_>>();
1436 Ok(json!({
1437 "draftPath": path.display().to_string(),
1438 "rawContent": raw_content,
1439 "frontmatter": frontmatter,
1440 "bodyMarkdown": body,
1441 "previewHtml": rendered.html,
1442 "issues": issues,
1443 }))
1444}
1445
1446fn compose_issue_view(issue: ComposeValidation) -> ComposeIssueView {
1447 match issue {
1448 ComposeValidation::Error(message) => ComposeIssueView {
1449 severity: "error",
1450 message,
1451 },
1452 ComposeValidation::Warning(message) => ComposeIssueView {
1453 severity: "warning",
1454 message,
1455 },
1456 }
1457}
1458
1459fn extract_compose_context(content: &str) -> Option<String> {
1460 const CONTEXT_MARKER: &str = "# --- context (stripped before sending) ---";
1461 let marker_index = content.find(CONTEXT_MARKER)?;
1462 let lines = content[marker_index + CONTEXT_MARKER.len()..]
1463 .lines()
1464 .map(|line| {
1465 line.strip_prefix("# ")
1466 .or_else(|| line.strip_prefix('#'))
1467 .unwrap_or(line)
1468 })
1469 .map(str::trim_end)
1470 .collect::<Vec<_>>();
1471 let context = lines.join("\n").trim().to_string();
1472 if context.is_empty() {
1473 None
1474 } else {
1475 Some(context)
1476 }
1477}
1478
1479fn extract_in_reply_to(content: &str) -> Result<Option<String>, BridgeError> {
1480 let (frontmatter, _) =
1481 parse_compose_file(content).map_err(|error| BridgeError::Ipc(error.to_string()))?;
1482 Ok(frontmatter.in_reply_to)
1483}
1484
1485fn extract_references(content: &str) -> Result<Vec<String>, BridgeError> {
1486 let (frontmatter, _) =
1487 parse_compose_file(content).map_err(|error| BridgeError::Ipc(error.to_string()))?;
1488 Ok(frontmatter.references)
1489}
1490
1491fn compose_draft_from_file(draft_path: &str, account_id: &str) -> Result<Draft, BridgeError> {
1492 let raw_content =
1493 std::fs::read_to_string(draft_path).map_err(|error| BridgeError::Ipc(error.to_string()))?;
1494 let (frontmatter, body) =
1495 parse_compose_file(&raw_content).map_err(|error| BridgeError::Ipc(error.to_string()))?;
1496 let issues = validate_draft(&frontmatter, &body);
1497 if issues.iter().any(ComposeValidation::is_error) {
1498 let message = issues
1499 .into_iter()
1500 .map(|issue| issue.to_string())
1501 .collect::<Vec<_>>()
1502 .join("; ");
1503 return Err(BridgeError::Ipc(format!("Draft errors: {message}")));
1504 }
1505
1506 let now = Utc::now();
1507 Ok(Draft {
1508 id: DraftId::new(),
1509 account_id: parse_account_id(account_id)?,
1510 reply_headers: frontmatter
1511 .in_reply_to
1512 .as_ref()
1513 .map(|in_reply_to| ReplyHeaders {
1514 in_reply_to: in_reply_to.clone(),
1515 references: frontmatter.references.clone(),
1516 }),
1517 to: parse_address_list(&frontmatter.to),
1518 cc: parse_address_list(&frontmatter.cc),
1519 bcc: parse_address_list(&frontmatter.bcc),
1520 subject: frontmatter.subject,
1521 body_markdown: body,
1522 attachments: frontmatter.attach.into_iter().map(PathBuf::from).collect(),
1523 created_at: now,
1524 updated_at: now,
1525 })
1526}
1527
1528async fn default_account(socket_path: &Path) -> Result<(AccountId, String), BridgeError> {
1529 let mut accounts = match ipc_request(socket_path, Request::ListAccounts).await? {
1530 ResponseData::Accounts { accounts } => accounts,
1531 _ => return Err(BridgeError::UnexpectedResponse),
1532 };
1533 if accounts.is_empty() {
1534 return Err(BridgeError::Ipc("No runtime account configured".into()));
1535 }
1536 let index = accounts
1537 .iter()
1538 .position(|account| account.is_default)
1539 .unwrap_or(0);
1540 let account = accounts.swap_remove(index);
1541 Ok((account.account_id, account.email))
1542}
1543
1544async fn account_summary(
1545 socket_path: &Path,
1546 account_id: &AccountId,
1547) -> Result<mxr_protocol::AccountSummaryData, BridgeError> {
1548 match ipc_request(socket_path, Request::ListAccounts).await? {
1549 ResponseData::Accounts { accounts } => accounts
1550 .into_iter()
1551 .find(|account| &account.account_id == account_id)
1552 .ok_or_else(|| BridgeError::Ipc("Account not found for compose session".into())),
1553 _ => Err(BridgeError::UnexpectedResponse),
1554 }
1555}
1556
1557async fn envelope_for_message(
1558 socket_path: &Path,
1559 message_id: &str,
1560) -> Result<Envelope, BridgeError> {
1561 match ipc_request(
1562 socket_path,
1563 Request::GetEnvelope {
1564 message_id: parse_message_id(message_id)?,
1565 },
1566 )
1567 .await?
1568 {
1569 ResponseData::Envelope { envelope } => Ok(envelope),
1570 _ => Err(BridgeError::UnexpectedResponse),
1571 }
1572}
1573
1574fn resolved_editor_command() -> String {
1575 std::env::var("EDITOR")
1576 .or_else(|_| std::env::var("VISUAL"))
1577 .unwrap_or_else(|_| "vi".to_string())
1578}
1579
1580async fn request_account_operation(
1581 socket_path: &Path,
1582 request: Request,
1583) -> Result<mxr_protocol::AccountOperationResult, BridgeError> {
1584 match ipc_request(socket_path, request).await? {
1585 ResponseData::AccountOperation { result } => Ok(result),
1586 _ => Err(BridgeError::UnexpectedResponse),
1587 }
1588}
1589
1590async fn run_account_save_workflow(
1591 socket_path: &Path,
1592 account: mxr_protocol::AccountConfigData,
1593) -> Result<mxr_protocol::AccountOperationResult, BridgeError> {
1594 let mut result = if account
1595 .sync
1596 .as_ref()
1597 .is_some_and(|sync| matches!(sync, mxr_protocol::AccountSyncConfigData::Gmail { .. }))
1598 {
1599 request_account_operation(
1600 socket_path,
1601 Request::AuthorizeAccountConfig {
1602 account: account.clone(),
1603 reauthorize: false,
1604 },
1605 )
1606 .await?
1607 } else {
1608 empty_account_operation_result()
1609 };
1610
1611 if result.auth.as_ref().is_some_and(|step| !step.ok) {
1612 return Ok(result);
1613 }
1614
1615 merge_account_operation_result(
1616 &mut result,
1617 request_account_operation(
1618 socket_path,
1619 Request::UpsertAccountConfig {
1620 account: account.clone(),
1621 },
1622 )
1623 .await?,
1624 );
1625
1626 if result.save.as_ref().is_some_and(|step| !step.ok) {
1627 return Ok(result);
1628 }
1629
1630 merge_account_operation_result(
1631 &mut result,
1632 request_account_operation(socket_path, Request::TestAccountConfig { account }).await?,
1633 );
1634
1635 Ok(result)
1636}
1637
1638fn empty_account_operation_result() -> mxr_protocol::AccountOperationResult {
1639 mxr_protocol::AccountOperationResult {
1640 ok: true,
1641 summary: String::new(),
1642 save: None,
1643 auth: None,
1644 sync: None,
1645 send: None,
1646 }
1647}
1648
1649fn merge_account_operation_result(
1650 base: &mut mxr_protocol::AccountOperationResult,
1651 next: mxr_protocol::AccountOperationResult,
1652) {
1653 base.ok &= next.ok;
1654 if !next.summary.is_empty() {
1655 base.summary = next.summary;
1656 }
1657 if next.save.is_some() {
1658 base.save = next.save;
1659 }
1660 if next.auth.is_some() {
1661 base.auth = next.auth;
1662 }
1663 if next.sync.is_some() {
1664 base.sync = next.sync;
1665 }
1666 if next.send.is_some() {
1667 base.send = next.send;
1668 }
1669}
1670
1671fn build_snooze_preset(
1672 name: &str,
1673 label: &str,
1674 config: &mxr_config::SnoozeConfig,
1675) -> serde_json::Value {
1676 let wake_at = resolve_snooze_until(name, config).unwrap_or_else(|_| Utc::now());
1677 json!({
1678 "id": name,
1679 "label": label,
1680 "wakeAt": wake_at,
1681 })
1682}
1683
1684fn resolve_snooze_until(
1685 until: &str,
1686 config: &mxr_config::SnoozeConfig,
1687) -> Result<DateTime<Utc>, BridgeError> {
1688 use chrono::{Datelike, Duration, NaiveTime, Weekday};
1689
1690 let now = Local::now();
1691 let lower = until.trim().to_ascii_lowercase();
1692 let wake_at = match lower.as_str() {
1693 "tomorrow" | "tomorrow_morning" => {
1694 let tomorrow = now.date_naive() + Duration::days(1);
1695 let time = NaiveTime::from_hms_opt(config.morning_hour as u32, 0, 0).unwrap();
1696 tomorrow
1697 .and_time(time)
1698 .and_local_timezone(now.timezone())
1699 .single()
1700 .unwrap()
1701 .with_timezone(&Utc)
1702 }
1703 "tonight" => {
1704 let today = now.date_naive();
1705 let time = NaiveTime::from_hms_opt(config.evening_hour as u32, 0, 0).unwrap();
1706 let tonight = today
1707 .and_time(time)
1708 .and_local_timezone(now.timezone())
1709 .single()
1710 .unwrap()
1711 .with_timezone(&Utc);
1712 if tonight <= Utc::now() {
1713 tonight + Duration::days(1)
1714 } else {
1715 tonight
1716 }
1717 }
1718 "weekend" => {
1719 let target_day = match config.weekend_day.as_str() {
1720 "sunday" => Weekday::Sun,
1721 _ => Weekday::Sat,
1722 };
1723 let days_until = (target_day.num_days_from_monday() as i64
1724 - now.weekday().num_days_from_monday() as i64
1725 + 7)
1726 % 7;
1727 let days = if days_until == 0 { 7 } else { days_until };
1728 let weekend = now.date_naive() + Duration::days(days);
1729 let time = NaiveTime::from_hms_opt(config.weekend_hour as u32, 0, 0).unwrap();
1730 weekend
1731 .and_time(time)
1732 .and_local_timezone(now.timezone())
1733 .single()
1734 .unwrap()
1735 .with_timezone(&Utc)
1736 }
1737 "monday" | "next_monday" => {
1738 let days_until_monday = (Weekday::Mon.num_days_from_monday() as i64
1739 - now.weekday().num_days_from_monday() as i64
1740 + 7)
1741 % 7;
1742 let days = if days_until_monday == 0 {
1743 7
1744 } else {
1745 days_until_monday
1746 };
1747 let monday = now.date_naive() + chrono::Duration::days(days);
1748 let time = NaiveTime::from_hms_opt(config.morning_hour as u32, 0, 0).unwrap();
1749 monday
1750 .and_time(time)
1751 .and_local_timezone(now.timezone())
1752 .single()
1753 .unwrap()
1754 .with_timezone(&Utc)
1755 }
1756 _ => DateTime::parse_from_rfc3339(until)
1757 .map_err(|_| BridgeError::Ipc(format!("invalid snooze time: {until}")))?
1758 .with_timezone(&Utc),
1759 };
1760 Ok(wake_at)
1761}
1762
1763async fn build_bridge_chrome(
1764 socket_path: &Path,
1765 active_lens: &MailboxLensRequest,
1766) -> Result<BridgeChrome, BridgeError> {
1767 let (accounts, total_messages, sync_statuses, repair_required) =
1768 match ipc_request(socket_path, Request::GetStatus).await? {
1769 ResponseData::Status {
1770 accounts,
1771 total_messages,
1772 sync_statuses,
1773 repair_required,
1774 ..
1775 } => (accounts, total_messages, sync_statuses, repair_required),
1776 _ => return Err(BridgeError::UnexpectedResponse),
1777 };
1778
1779 let labels = match ipc_request(socket_path, Request::ListLabels { account_id: None }).await? {
1780 ResponseData::Labels { labels } => labels,
1781 _ => return Err(BridgeError::UnexpectedResponse),
1782 };
1783
1784 let searches = match ipc_request(socket_path, Request::ListSavedSearches).await? {
1785 ResponseData::SavedSearches { searches } => searches,
1786 _ => return Err(BridgeError::UnexpectedResponse),
1787 };
1788
1789 let subscriptions =
1790 match ipc_request(socket_path, Request::ListSubscriptions { limit: 8 }).await? {
1791 ResponseData::Subscriptions { subscriptions } => subscriptions,
1792 _ => return Err(BridgeError::UnexpectedResponse),
1793 };
1794
1795 let sync_label = if sync_statuses.iter().any(|status| status.sync_in_progress) {
1796 "Syncing"
1797 } else if sync_statuses
1798 .iter()
1799 .any(|status| !status.healthy || status.last_error.is_some())
1800 {
1801 "Needs attention"
1802 } else {
1803 "Synced"
1804 };
1805
1806 let status_message = if repair_required {
1807 "Repair required before mailbox opens".to_string()
1808 } else if sync_statuses
1809 .iter()
1810 .any(|status| status.last_error.is_some())
1811 {
1812 "Last sync needs attention".to_string()
1813 } else {
1814 "Local-first and ready".to_string()
1815 };
1816
1817 Ok(BridgeChrome {
1818 shell: json!({
1819 "accountLabel": accounts.first().cloned().unwrap_or_else(|| "local".to_string()),
1820 "syncLabel": sync_label,
1821 "statusMessage": status_message,
1822 "commandHint": "Ctrl-p",
1823 }),
1824 sidebar: json!({ "sections": build_sidebar_sections(&labels, &searches, &subscriptions, total_messages, active_lens) }),
1825 inbox_label_id: find_inbox_label(&labels).map(|label| label.id.clone()),
1826 labels,
1827 searches,
1828 subscriptions,
1829 })
1830}
1831
1832async fn ack_mutation(
1833 socket_path: &Path,
1834 mutation: mxr_protocol::MutationCommand,
1835) -> Result<Json<serde_json::Value>, BridgeError> {
1836 ack_request(socket_path, Request::Mutation(mutation)).await
1837}
1838
1839async fn ack_request(
1840 socket_path: &Path,
1841 request: Request,
1842) -> Result<Json<serde_json::Value>, BridgeError> {
1843 match ipc_request(socket_path, request).await? {
1844 ResponseData::Ack => Ok(Json(serde_json::json!({ "ok": true }))),
1845 _ => Err(BridgeError::UnexpectedResponse),
1846 }
1847}
1848
1849fn find_inbox_label(labels: &[Label]) -> Option<&Label> {
1850 labels
1851 .iter()
1852 .find(|label| matches_system_label(label, "Inbox"))
1853}
1854
1855fn matches_system_label(label: &Label, expected: &str) -> bool {
1856 matches!(label.kind, LabelKind::System) && label.name.eq_ignore_ascii_case(expected)
1857}
1858
1859fn mailbox_counts(labels: &[Label], envelopes: &[Envelope]) -> serde_json::Value {
1860 if let Some(inbox) = find_inbox_label(labels) {
1861 json!({
1862 "unread": inbox.unread_count,
1863 "total": inbox.total_count,
1864 })
1865 } else {
1866 json!({
1867 "unread": envelopes
1868 .iter()
1869 .filter(|envelope| !envelope.flags.contains(MessageFlags::READ))
1870 .count(),
1871 "total": envelopes.len(),
1872 })
1873 }
1874}
1875
1876fn derived_counts(envelopes: &[Envelope]) -> serde_json::Value {
1877 json!({
1878 "unread": envelopes
1879 .iter()
1880 .filter(|envelope| !envelope.flags.contains(MessageFlags::READ))
1881 .count(),
1882 "total": envelopes.len(),
1883 })
1884}
1885
1886fn build_sidebar_sections(
1887 labels: &[Label],
1888 searches: &[SavedSearch],
1889 subscriptions: &[SubscriptionSummary],
1890 total_messages: u32,
1891 active_lens: &MailboxLensRequest,
1892) -> Vec<serde_json::Value> {
1893 let all_mail_total = labels
1894 .iter()
1895 .find(|label| matches_system_label(label, "All Mail"))
1896 .map(|label| label.total_count)
1897 .unwrap_or(total_messages);
1898 let all_mail_unread = labels
1899 .iter()
1900 .find(|label| matches_system_label(label, "All Mail"))
1901 .map(|label| label.unread_count)
1902 .unwrap_or_default();
1903
1904 let mut system_items = Vec::new();
1905 for name in ["Inbox", "Starred", "Sent", "Drafts", "Spam", "Trash"] {
1906 if let Some(label) = labels
1907 .iter()
1908 .find(|label| matches_system_label(label, name))
1909 {
1910 system_items.push(json!({
1911 "id": slugify(&label.name),
1912 "label": label.name,
1913 "unread": label.unread_count,
1914 "total": label.total_count,
1915 "active": active_lens.kind == MailboxLensKind::Label
1916 && active_lens.label_id.as_deref() == Some(&label.id.to_string())
1917 || active_lens.kind == MailboxLensKind::Inbox && name == "Inbox",
1918 "lens": {
1919 "kind": if name == "Inbox" { "inbox" } else { "label" },
1920 "labelId": if name == "Inbox" {
1921 None::<String>
1922 } else {
1923 Some(label.id.to_string())
1924 },
1925 },
1926 }));
1927 }
1928 }
1929 system_items.push(json!({
1930 "id": "all-mail",
1931 "label": "All Mail",
1932 "unread": all_mail_unread,
1933 "total": all_mail_total,
1934 "active": active_lens.kind == MailboxLensKind::AllMail,
1935 "lens": { "kind": "all_mail" },
1936 }));
1937
1938 let user_labels = labels
1939 .iter()
1940 .filter(|label| !matches!(label.kind, LabelKind::System))
1941 .map(|label| {
1942 json!({
1943 "id": slugify(&label.name),
1944 "label": label.name,
1945 "unread": label.unread_count,
1946 "total": label.total_count,
1947 "active": active_lens.kind == MailboxLensKind::Label
1948 && active_lens.label_id.as_deref() == Some(&label.id.to_string()),
1949 "lens": {
1950 "kind": "label",
1951 "labelId": label.id.to_string(),
1952 },
1953 })
1954 })
1955 .collect::<Vec<_>>();
1956
1957 let saved_search_items = sorted_saved_searches(searches.to_vec())
1958 .into_iter()
1959 .map(|search| {
1960 json!({
1961 "id": format!("saved-search-{}", slugify(&search.name)),
1962 "label": search.name,
1963 "unread": 0,
1964 "total": 0,
1965 "active": active_lens.kind == MailboxLensKind::SavedSearch
1966 && active_lens.saved_search.as_deref() == Some(search.name.as_str()),
1967 "lens": {
1968 "kind": "saved_search",
1969 "savedSearch": search.name,
1970 },
1971 })
1972 })
1973 .collect::<Vec<_>>();
1974
1975 system_items.push(json!({
1976 "id": "subscriptions",
1977 "label": "Subscriptions",
1978 "unread": subscriptions
1979 .iter()
1980 .filter(|subscription| !subscription.latest_flags.contains(MessageFlags::READ))
1981 .count(),
1982 "total": subscriptions.len(),
1983 "active": active_lens.kind == MailboxLensKind::Subscription,
1984 "lens": { "kind": "subscription" },
1985 }));
1986
1987 let mut sections = vec![json!({
1988 "id": "system",
1989 "title": "System",
1990 "items": system_items,
1991 })];
1992 if !user_labels.is_empty() {
1993 sections.push(json!({
1994 "id": "labels",
1995 "title": "Labels",
1996 "items": user_labels,
1997 }));
1998 }
1999 if !saved_search_items.is_empty() {
2000 sections.push(json!({
2001 "id": "saved-searches",
2002 "title": "Saved Searches",
2003 "items": saved_search_items,
2004 }));
2005 }
2006 sections
2007}
2008
2009async fn load_mailbox_selection(
2010 socket_path: &Path,
2011 chrome: &BridgeChrome,
2012 lens: &MailboxLensRequest,
2013 limit: u32,
2014 offset: u32,
2015) -> Result<MailboxSelection, BridgeError> {
2016 match lens.kind {
2017 MailboxLensKind::Inbox => {
2018 let envelopes =
2019 list_envelopes(socket_path, chrome.inbox_label_id.clone(), limit, offset).await?;
2020 Ok(MailboxSelection {
2021 lens_label: find_inbox_label(&chrome.labels)
2022 .map(|label| label.name.clone())
2023 .unwrap_or_else(|| "Inbox".to_string()),
2024 counts: mailbox_counts(&chrome.labels, &envelopes),
2025 envelopes,
2026 })
2027 }
2028 MailboxLensKind::AllMail => {
2029 let envelopes = list_envelopes(socket_path, None, limit, offset).await?;
2030 let counts = chrome
2031 .labels
2032 .iter()
2033 .find(|label| matches_system_label(label, "All Mail"))
2034 .map(|label| {
2035 json!({
2036 "unread": label.unread_count,
2037 "total": label.total_count,
2038 })
2039 })
2040 .unwrap_or_else(|| derived_counts(&envelopes));
2041 Ok(MailboxSelection {
2042 lens_label: "All Mail".to_string(),
2043 counts,
2044 envelopes,
2045 })
2046 }
2047 MailboxLensKind::Label => {
2048 let label_id = lens
2049 .label_id
2050 .as_deref()
2051 .ok_or_else(|| BridgeError::Ipc("label lens missing label_id".into()))
2052 .and_then(parse_label_id)?;
2053 let envelopes =
2054 list_envelopes(socket_path, Some(label_id.clone()), limit, offset).await?;
2055 let label = chrome
2056 .labels
2057 .iter()
2058 .find(|candidate| candidate.id == label_id);
2059 Ok(MailboxSelection {
2060 lens_label: label
2061 .map(|label| label.name.clone())
2062 .unwrap_or_else(|| "Label".to_string()),
2063 counts: label
2064 .map(|label| {
2065 json!({
2066 "unread": label.unread_count,
2067 "total": label.total_count,
2068 })
2069 })
2070 .unwrap_or_else(|| derived_counts(&envelopes)),
2071 envelopes,
2072 })
2073 }
2074 MailboxLensKind::SavedSearch => {
2075 let name = lens
2076 .saved_search
2077 .as_deref()
2078 .ok_or_else(|| BridgeError::Ipc("saved search lens missing saved_search".into()))?;
2079 let envelopes = run_saved_search(socket_path, name, limit).await?;
2080 Ok(MailboxSelection {
2081 lens_label: chrome
2082 .searches
2083 .iter()
2084 .find(|search| search.name == name)
2085 .map(|search| search.name.clone())
2086 .unwrap_or_else(|| name.to_string()),
2087 counts: derived_counts(&envelopes),
2088 envelopes,
2089 })
2090 }
2091 MailboxLensKind::Subscription => {
2092 if let Some(sender_email) = lens.sender_email.as_deref() {
2093 let envelopes = search_envelopes(socket_path, sender_email, limit).await?;
2094 return Ok(MailboxSelection {
2095 lens_label: chrome
2096 .subscriptions
2097 .iter()
2098 .find(|subscription| subscription.sender_email == sender_email)
2099 .and_then(|subscription| subscription.sender_name.clone())
2100 .unwrap_or_else(|| sender_email.to_string()),
2101 counts: derived_counts(&envelopes),
2102 envelopes,
2103 });
2104 }
2105
2106 let message_ids = chrome
2107 .subscriptions
2108 .iter()
2109 .take(limit as usize)
2110 .map(|subscription| subscription.latest_message_id.clone())
2111 .collect::<Vec<_>>();
2112 let envelopes = list_envelopes_by_message_ids(socket_path, &message_ids).await?;
2113 Ok(MailboxSelection {
2114 lens_label: "Subscriptions".to_string(),
2115 counts: json!({
2116 "unread": chrome
2117 .subscriptions
2118 .iter()
2119 .filter(|subscription| !subscription.latest_flags.contains(MessageFlags::READ))
2120 .count(),
2121 "total": chrome.subscriptions.len(),
2122 }),
2123 envelopes,
2124 })
2125 }
2126 }
2127}
2128
2129async fn list_envelopes(
2130 socket_path: &Path,
2131 label_id: Option<LabelId>,
2132 limit: u32,
2133 offset: u32,
2134) -> Result<Vec<Envelope>, BridgeError> {
2135 match ipc_request(
2136 socket_path,
2137 Request::ListEnvelopes {
2138 label_id,
2139 account_id: None,
2140 limit,
2141 offset,
2142 },
2143 )
2144 .await?
2145 {
2146 ResponseData::Envelopes { envelopes } => Ok(envelopes),
2147 _ => Err(BridgeError::UnexpectedResponse),
2148 }
2149}
2150
2151async fn list_envelopes_by_message_ids(
2152 socket_path: &Path,
2153 message_ids: &[MessageId],
2154) -> Result<Vec<Envelope>, BridgeError> {
2155 if message_ids.is_empty() {
2156 return Ok(Vec::new());
2157 }
2158 match ipc_request(
2159 socket_path,
2160 Request::ListEnvelopesByIds {
2161 message_ids: message_ids.to_vec(),
2162 },
2163 )
2164 .await?
2165 {
2166 ResponseData::Envelopes { envelopes } => Ok(reorder_envelopes(envelopes, message_ids)),
2167 _ => Err(BridgeError::UnexpectedResponse),
2168 }
2169}
2170
2171async fn run_saved_search(
2172 socket_path: &Path,
2173 name: &str,
2174 limit: u32,
2175) -> Result<Vec<Envelope>, BridgeError> {
2176 match ipc_request(
2177 socket_path,
2178 Request::RunSavedSearch {
2179 name: name.to_string(),
2180 limit,
2181 },
2182 )
2183 .await?
2184 {
2185 ResponseData::SearchResults { results, .. } => {
2186 search_result_envelopes(socket_path, &results).await
2187 }
2188 _ => Err(BridgeError::UnexpectedResponse),
2189 }
2190}
2191
2192async fn search_envelopes(
2193 socket_path: &Path,
2194 query: &str,
2195 limit: u32,
2196) -> Result<Vec<Envelope>, BridgeError> {
2197 match ipc_request(
2198 socket_path,
2199 Request::Search {
2200 query: query.to_string(),
2201 limit,
2202 offset: 0,
2203 mode: Some(SearchMode::Lexical),
2204 sort: Some(SortOrder::DateDesc),
2205 explain: false,
2206 },
2207 )
2208 .await?
2209 {
2210 ResponseData::SearchResults { results, .. } => {
2211 search_result_envelopes(socket_path, &results).await
2212 }
2213 _ => Err(BridgeError::UnexpectedResponse),
2214 }
2215}
2216
2217async fn search_result_envelopes(
2218 socket_path: &Path,
2219 results: &[SearchResultItem],
2220) -> Result<Vec<Envelope>, BridgeError> {
2221 let message_ids = results
2222 .iter()
2223 .map(|result| result.message_id.clone())
2224 .collect::<Vec<_>>();
2225 if message_ids.is_empty() {
2226 return Ok(Vec::new());
2227 }
2228 match ipc_request(
2229 socket_path,
2230 Request::ListEnvelopesByIds {
2231 message_ids: message_ids.clone(),
2232 },
2233 )
2234 .await?
2235 {
2236 ResponseData::Envelopes { envelopes } => Ok(reorder_envelopes(envelopes, &message_ids)),
2237 _ => Err(BridgeError::UnexpectedResponse),
2238 }
2239}
2240
2241fn group_envelopes(envelopes: Vec<Envelope>) -> Vec<MessageGroupView> {
2242 let mut groups = Vec::<MessageGroupView>::new();
2243
2244 for envelope in envelopes {
2245 let (group_id, label) = date_bucket(envelope.date);
2246 let row = message_row_view(&envelope);
2247 if let Some(existing) = groups.iter_mut().find(|group| group.id == group_id) {
2248 existing.rows.push(row);
2249 } else {
2250 groups.push(MessageGroupView {
2251 id: group_id.to_string(),
2252 label: label.to_string(),
2253 rows: vec![row],
2254 });
2255 }
2256 }
2257
2258 groups
2259}
2260
2261fn date_bucket(date: DateTime<Utc>) -> (&'static str, &'static str) {
2262 let local = date.with_timezone(&Local);
2263 let today = Local::now().date_naive();
2264 let days_old = today.signed_duration_since(local.date_naive()).num_days();
2265
2266 match days_old {
2267 0 => ("today", "Today"),
2268 1 => ("yesterday", "Yesterday"),
2269 2..=6 => ("last-7-days", "Last 7 Days"),
2270 _ if local.year() == today.year() => ("earlier", "Earlier"),
2271 _ => ("older", "Older"),
2272 }
2273}
2274
2275fn message_row_view(envelope: &Envelope) -> MessageRowView {
2276 MessageRowView {
2277 id: envelope.id.to_string(),
2278 thread_id: envelope.thread_id.to_string(),
2279 provider_id: envelope.provider_id.clone(),
2280 sender: envelope
2281 .from
2282 .name
2283 .clone()
2284 .unwrap_or_else(|| envelope.from.email.clone()),
2285 sender_detail: Some(envelope.from.email.clone()),
2286 subject: envelope.subject.clone(),
2287 snippet: envelope.snippet.clone(),
2288 date_label: format_date_label(envelope.date),
2289 unread: !envelope.flags.contains(MessageFlags::READ),
2290 starred: envelope.flags.contains(MessageFlags::STARRED),
2291 has_attachments: envelope.has_attachments,
2292 }
2293}
2294
2295fn format_date_label(date: DateTime<Utc>) -> String {
2296 let local = date.with_timezone(&Local);
2297 let today = Local::now().date_naive();
2298 if today == local.date_naive() {
2299 return local.format("%-I:%M%P").to_string();
2300 }
2301 local.format("%b %-d").to_string()
2302}
2303
2304fn thread_reader_mode(bodies: &[MessageBody]) -> &'static str {
2305 let has_plain = bodies.iter().any(|body| body.text_plain.as_ref().is_some());
2306 let has_html = bodies.iter().any(|body| body.text_html.as_ref().is_some());
2307 if has_html && !has_plain {
2308 "html"
2309 } else {
2310 "reader"
2311 }
2312}
2313
2314fn reorder_envelopes(envelopes: Vec<Envelope>, order: &[MessageId]) -> Vec<Envelope> {
2315 let mut by_id = HashMap::new();
2316 for envelope in envelopes {
2317 by_id.insert(envelope.id.clone(), envelope);
2318 }
2319
2320 order.iter().filter_map(|id| by_id.remove(id)).collect()
2321}
2322
2323fn dedupe_search_results_by_thread(results: Vec<SearchResultItem>) -> Vec<SearchResultItem> {
2324 let mut seen = HashSet::new();
2325 results
2326 .into_iter()
2327 .filter(|result| seen.insert(result.thread_id.clone()))
2328 .collect()
2329}
2330
2331fn slugify(value: &str) -> String {
2332 value
2333 .to_ascii_lowercase()
2334 .chars()
2335 .map(|ch| if ch.is_ascii_alphanumeric() { ch } else { '-' })
2336 .collect::<String>()
2337 .trim_matches('-')
2338 .to_string()
2339}
2340
2341fn sorted_saved_searches(mut searches: Vec<SavedSearch>) -> Vec<SavedSearch> {
2342 searches.sort_by_key(|search| search.position);
2343 searches
2344}
2345
2346#[cfg(test)]
2347mod tests {
2348 use super::*;
2349 use chrono::Utc;
2350 use futures::{SinkExt, StreamExt};
2351 use mxr_core::{
2352 id::{AccountId, MessageId, ThreadId},
2353 types::{
2354 Address, Envelope, Label, LabelKind, MessageBody, MessageFlags, MessageMetadata,
2355 SavedSearch, SortOrder, SubscriptionSummary, Thread, UnsubscribeMethod,
2356 },
2357 };
2358 use mxr_protocol::{
2359 DaemonEvent, IpcCodec, IpcMessage, IpcPayload, Request, Response, ResponseData,
2360 SearchResultItem, IPC_PROTOCOL_VERSION,
2361 };
2362 use tempfile::TempDir;
2363 use tokio::net::UnixListener;
2364 use tokio_tungstenite::tungstenite::Message;
2365 use tokio_util::codec::Framed;
2366
2367 const TEST_AUTH_TOKEN: &str = "test-token";
2368
2369 async fn spawn_fake_ipc_server<F>(
2370 socket_path: &std::path::Path,
2371 responder: F,
2372 event: Option<DaemonEvent>,
2373 ) -> tokio::task::JoinHandle<()>
2374 where
2375 F: Fn(Request) -> Option<Response> + Send + Sync + 'static,
2376 {
2377 let responder = std::sync::Arc::new(responder);
2378 let listener = UnixListener::bind(socket_path).unwrap();
2379 tokio::spawn(async move {
2380 loop {
2381 let Ok((stream, _)) = listener.accept().await else {
2382 break;
2383 };
2384 let responder = responder.clone();
2385 let event = event.clone();
2386 tokio::spawn(async move {
2387 let mut framed = Framed::new(stream, IpcCodec::new());
2388 if let Some(event) = event {
2389 let _ = framed
2390 .send(IpcMessage {
2391 id: 0,
2392 payload: IpcPayload::Event(event),
2393 })
2394 .await;
2395 return;
2396 }
2397 while let Some(Ok(message)) = framed.next().await {
2398 if let IpcPayload::Request(request) = message.payload {
2399 let Some(response) = responder(request) else {
2400 continue;
2401 };
2402 let response = IpcMessage {
2403 id: message.id,
2404 payload: IpcPayload::Response(response),
2405 };
2406 let _ = framed.send(response).await;
2407 }
2408 }
2409 });
2410 }
2411 })
2412 }
2413
2414 async fn spawn_fake_event_server(socket_path: &std::path::Path) -> tokio::task::JoinHandle<()> {
2415 spawn_fake_ipc_server(
2416 socket_path,
2417 |_| None,
2418 Some(DaemonEvent::SyncCompleted {
2419 account_id: AccountId::new(),
2420 messages_synced: 3,
2421 }),
2422 )
2423 .await
2424 }
2425
2426 #[tokio::test]
2427 async fn status_endpoint_proxies_ipc_status() {
2428 let temp = TempDir::new().unwrap();
2429 let socket_path = temp.path().join("mxr.sock");
2430 let _ipc = spawn_fake_ipc_server(
2431 &socket_path,
2432 |request| match request {
2433 Request::GetStatus => Some(Response::Ok {
2434 data: ResponseData::Status {
2435 uptime_secs: 42,
2436 accounts: vec!["personal".into()],
2437 total_messages: 17,
2438 daemon_pid: Some(999),
2439 sync_statuses: Vec::new(),
2440 protocol_version: IPC_PROTOCOL_VERSION,
2441 daemon_version: Some("0.4.3".into()),
2442 daemon_build_id: Some("build-123".into()),
2443 repair_required: false,
2444 },
2445 }),
2446 _ => None,
2447 },
2448 None,
2449 )
2450 .await;
2451
2452 let addr = bind_and_serve(
2453 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2454 0,
2455 WebServerConfig::new(socket_path, TEST_AUTH_TOKEN.into()),
2456 )
2457 .await
2458 .unwrap();
2459
2460 let response = reqwest::Client::new()
2461 .get(format!("http://{addr}/status"))
2462 .header("x-mxr-bridge-token", TEST_AUTH_TOKEN)
2463 .send()
2464 .await
2465 .unwrap();
2466 assert_eq!(response.status(), reqwest::StatusCode::OK);
2467
2468 let json: serde_json::Value = response.json().await.unwrap();
2469 assert_eq!(json["protocol_version"], IPC_PROTOCOL_VERSION);
2470 assert_eq!(json["daemon_version"], "0.4.3");
2471 assert_eq!(json["total_messages"], 17);
2472 }
2473
2474 #[tokio::test]
2475 async fn websocket_events_proxy_daemon_events() {
2476 let temp = TempDir::new().unwrap();
2477 let socket_path = temp.path().join("mxr.sock");
2478 let _ipc = spawn_fake_event_server(&socket_path).await;
2479
2480 let addr = bind_and_serve(
2481 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2482 0,
2483 WebServerConfig::new(socket_path, TEST_AUTH_TOKEN.into()),
2484 )
2485 .await
2486 .unwrap();
2487
2488 let (mut stream, _) =
2489 tokio_tungstenite::connect_async(format!("ws://{addr}/events?token={TEST_AUTH_TOKEN}"))
2490 .await
2491 .unwrap();
2492 let message = stream.next().await.unwrap().unwrap();
2493 let text = match message {
2494 Message::Text(text) => text.to_string(),
2495 other => panic!("expected text websocket frame, got {other:?}"),
2496 };
2497
2498 assert!(text.contains("SyncCompleted"));
2499 assert!(text.contains("\"messages_synced\":3"));
2500 }
2501
2502 #[tokio::test]
2503 async fn status_endpoint_rejects_missing_token() {
2504 let temp = TempDir::new().unwrap();
2505 let socket_path = temp.path().join("mxr.sock");
2506 let _ipc = spawn_fake_ipc_server(
2507 &socket_path,
2508 |_request| {
2509 Some(Response::Ok {
2510 data: ResponseData::Ack,
2511 })
2512 },
2513 None,
2514 )
2515 .await;
2516
2517 let addr = bind_and_serve(
2518 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2519 0,
2520 WebServerConfig::new(socket_path, TEST_AUTH_TOKEN.into()),
2521 )
2522 .await
2523 .unwrap();
2524
2525 let response = reqwest::get(format!("http://{addr}/status")).await.unwrap();
2526 assert_eq!(response.status(), reqwest::StatusCode::UNAUTHORIZED);
2527 }
2528
2529 fn sample_envelope() -> Envelope {
2530 Envelope {
2531 id: MessageId::new(),
2532 account_id: AccountId::new(),
2533 provider_id: "provider-msg-1".into(),
2534 thread_id: ThreadId::new(),
2535 message_id_header: Some("<msg-1@example.com>".into()),
2536 in_reply_to: None,
2537 references: Vec::new(),
2538 from: Address {
2539 name: Some("Sender".into()),
2540 email: "sender@example.com".into(),
2541 },
2542 to: vec![Address {
2543 name: Some("User".into()),
2544 email: "user@example.com".into(),
2545 }],
2546 cc: Vec::new(),
2547 bcc: Vec::new(),
2548 subject: "Mailroom".into(),
2549 date: Utc::now(),
2550 flags: MessageFlags::empty(),
2551 snippet: "Preview".into(),
2552 has_attachments: false,
2553 size_bytes: 128,
2554 unsubscribe: UnsubscribeMethod::None,
2555 label_provider_ids: Vec::new(),
2556 }
2557 }
2558
2559 fn sample_labels(account_id: &AccountId) -> Vec<Label> {
2560 vec![
2561 Label {
2562 id: mxr_core::LabelId::new(),
2563 account_id: account_id.clone(),
2564 name: "Inbox".into(),
2565 kind: LabelKind::System,
2566 color: None,
2567 provider_id: "INBOX".into(),
2568 unread_count: 12,
2569 total_count: 144,
2570 },
2571 Label {
2572 id: mxr_core::LabelId::new(),
2573 account_id: account_id.clone(),
2574 name: "All Mail".into(),
2575 kind: LabelKind::System,
2576 color: None,
2577 provider_id: "ALL_MAIL".into(),
2578 unread_count: 24,
2579 total_count: 8124,
2580 },
2581 Label {
2582 id: mxr_core::LabelId::new(),
2583 account_id: account_id.clone(),
2584 name: "Follow Up".into(),
2585 kind: LabelKind::User,
2586 color: None,
2587 provider_id: "follow-up".into(),
2588 unread_count: 2,
2589 total_count: 18,
2590 },
2591 ]
2592 }
2593
2594 fn sample_saved_search(account_id: AccountId) -> SavedSearch {
2595 SavedSearch {
2596 id: mxr_core::SavedSearchId::new(),
2597 account_id: Some(account_id),
2598 name: "Today".into(),
2599 query: "in:inbox newer_than:1d".into(),
2600 search_mode: SearchMode::Lexical,
2601 sort: SortOrder::DateDesc,
2602 icon: Some("sun".into()),
2603 position: 0,
2604 created_at: Utc::now(),
2605 }
2606 }
2607
2608 fn sample_subscription(account_id: &AccountId) -> SubscriptionSummary {
2609 SubscriptionSummary {
2610 account_id: account_id.clone(),
2611 sender_name: Some("Readwise".into()),
2612 sender_email: "hello@readwise.io".into(),
2613 message_count: 4,
2614 latest_message_id: MessageId::new(),
2615 latest_provider_id: "provider-subscription-1".into(),
2616 latest_thread_id: ThreadId::new(),
2617 latest_subject: "Latest digest".into(),
2618 latest_snippet: "Highlights from this week".into(),
2619 latest_date: Utc::now(),
2620 latest_flags: MessageFlags::empty(),
2621 latest_has_attachments: false,
2622 latest_size_bytes: 256,
2623 unsubscribe: UnsubscribeMethod::None,
2624 }
2625 }
2626
2627 fn sample_account(account_id: &AccountId) -> mxr_protocol::AccountSummaryData {
2628 mxr_protocol::AccountSummaryData {
2629 account_id: account_id.clone(),
2630 key: Some("personal".into()),
2631 name: "Personal".into(),
2632 email: "me@example.com".into(),
2633 provider_kind: "gmail".into(),
2634 sync_kind: Some("gmail".into()),
2635 send_kind: Some("smtp".into()),
2636 enabled: true,
2637 is_default: true,
2638 source: mxr_protocol::AccountSourceData::Runtime,
2639 editable: mxr_protocol::AccountEditModeData::Full,
2640 sync: None,
2641 send: None,
2642 }
2643 }
2644
2645 fn sample_thread(envelope: &Envelope) -> Thread {
2646 Thread {
2647 id: envelope.thread_id.clone(),
2648 account_id: envelope.account_id.clone(),
2649 subject: envelope.subject.clone(),
2650 participants: vec![envelope.from.clone()],
2651 message_count: 1,
2652 unread_count: 1,
2653 latest_date: envelope.date,
2654 snippet: envelope.snippet.clone(),
2655 }
2656 }
2657
2658 fn sample_body(envelope: &Envelope) -> MessageBody {
2659 MessageBody {
2660 message_id: envelope.id.clone(),
2661 text_plain: Some("plain text".into()),
2662 text_html: Some("<p>rich html</p>".into()),
2663 attachments: Vec::new(),
2664 fetched_at: Utc::now(),
2665 metadata: MessageMetadata::default(),
2666 }
2667 }
2668
2669 #[tokio::test]
2670 async fn mailbox_endpoint_lists_envelopes() {
2671 let temp = TempDir::new().unwrap();
2672 let socket_path = temp.path().join("mxr.sock");
2673 let expected = sample_envelope();
2674 let expected_id = expected.id.to_string();
2675 let labels = sample_labels(&expected.account_id);
2676 let saved_search = sample_saved_search(expected.account_id.clone());
2677 let subscription = sample_subscription(&expected.account_id);
2678 let inbox_label_id = labels[0].id.clone();
2679 let _ipc = spawn_fake_ipc_server(
2680 &socket_path,
2681 move |request| match request {
2682 Request::GetStatus => Some(Response::Ok {
2683 data: ResponseData::Status {
2684 uptime_secs: 42,
2685 accounts: vec!["personal".into()],
2686 total_messages: 8124,
2687 daemon_pid: Some(999),
2688 sync_statuses: Vec::new(),
2689 protocol_version: IPC_PROTOCOL_VERSION,
2690 daemon_version: Some("0.4.4".into()),
2691 daemon_build_id: Some("build-123".into()),
2692 repair_required: false,
2693 },
2694 }),
2695 Request::ListLabels { account_id: None } => Some(Response::Ok {
2696 data: ResponseData::Labels {
2697 labels: labels.clone(),
2698 },
2699 }),
2700 Request::ListSavedSearches => Some(Response::Ok {
2701 data: ResponseData::SavedSearches {
2702 searches: vec![saved_search.clone()],
2703 },
2704 }),
2705 Request::ListSubscriptions { limit: 8 } => Some(Response::Ok {
2706 data: ResponseData::Subscriptions {
2707 subscriptions: vec![subscription.clone()],
2708 },
2709 }),
2710 Request::ListEnvelopes {
2711 limit: 200,
2712 offset: 0,
2713 label_id: Some(label_id),
2714 account_id: None,
2715 } if label_id == inbox_label_id => Some(Response::Ok {
2716 data: ResponseData::Envelopes {
2717 envelopes: vec![expected.clone()],
2718 },
2719 }),
2720 _ => None,
2721 },
2722 None,
2723 )
2724 .await;
2725
2726 let addr = bind_and_serve(
2727 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2728 0,
2729 WebServerConfig::new(socket_path, TEST_AUTH_TOKEN.into()),
2730 )
2731 .await
2732 .unwrap();
2733
2734 let response = reqwest::Client::new()
2735 .get(format!("http://{addr}/mailbox"))
2736 .header("x-mxr-bridge-token", TEST_AUTH_TOKEN)
2737 .send()
2738 .await
2739 .unwrap();
2740 assert_eq!(response.status(), reqwest::StatusCode::OK);
2741
2742 let json: serde_json::Value = response.json().await.unwrap();
2743 assert_eq!(json["shell"]["statusMessage"], "Local-first and ready");
2744 assert_eq!(json["sidebar"]["sections"][0]["title"], "System");
2745 assert_eq!(json["sidebar"]["sections"][1]["title"], "Labels");
2746 assert!(json["sidebar"]["sections"][0]["items"]
2747 .as_array()
2748 .unwrap()
2749 .iter()
2750 .any(|item| item["label"] == "Subscriptions"));
2751 assert_eq!(json["mailbox"]["lensLabel"], "Inbox");
2752 assert_eq!(json["mailbox"]["groups"][0]["rows"][0]["id"], expected_id);
2753 assert_eq!(
2754 json["mailbox"]["groups"][0]["rows"][0]["subject"],
2755 "Mailroom"
2756 );
2757 }
2758
2759 #[tokio::test]
2760 async fn mailbox_endpoint_supports_all_mail_lens() {
2761 let temp = TempDir::new().unwrap();
2762 let socket_path = temp.path().join("mxr.sock");
2763 let mut expected = sample_envelope();
2764 expected.subject = "Archive rollup".into();
2765 expected.snippet = "Everything local, nothing filtered.".into();
2766 let labels = sample_labels(&expected.account_id);
2767 let saved_search = sample_saved_search(expected.account_id.clone());
2768 let subscription = sample_subscription(&expected.account_id);
2769 let _ipc = spawn_fake_ipc_server(
2770 &socket_path,
2771 move |request| match request {
2772 Request::GetStatus => Some(Response::Ok {
2773 data: ResponseData::Status {
2774 uptime_secs: 42,
2775 accounts: vec!["personal".into()],
2776 total_messages: 8124,
2777 daemon_pid: Some(999),
2778 sync_statuses: Vec::new(),
2779 protocol_version: IPC_PROTOCOL_VERSION,
2780 daemon_version: Some("0.4.4".into()),
2781 daemon_build_id: Some("build-123".into()),
2782 repair_required: false,
2783 },
2784 }),
2785 Request::ListLabels { account_id: None } => Some(Response::Ok {
2786 data: ResponseData::Labels {
2787 labels: labels.clone(),
2788 },
2789 }),
2790 Request::ListSavedSearches => Some(Response::Ok {
2791 data: ResponseData::SavedSearches {
2792 searches: vec![saved_search.clone()],
2793 },
2794 }),
2795 Request::ListSubscriptions { limit: 8 } => Some(Response::Ok {
2796 data: ResponseData::Subscriptions {
2797 subscriptions: vec![subscription.clone()],
2798 },
2799 }),
2800 Request::ListEnvelopes {
2801 limit: 200,
2802 offset: 0,
2803 label_id: None,
2804 account_id: None,
2805 } => Some(Response::Ok {
2806 data: ResponseData::Envelopes {
2807 envelopes: vec![expected.clone()],
2808 },
2809 }),
2810 _ => None,
2811 },
2812 None,
2813 )
2814 .await;
2815
2816 let addr = bind_and_serve(
2817 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2818 0,
2819 WebServerConfig::new(socket_path, TEST_AUTH_TOKEN.into()),
2820 )
2821 .await
2822 .unwrap();
2823
2824 let response = reqwest::Client::new()
2825 .get(format!("http://{addr}/mailbox?lens_kind=all_mail"))
2826 .header("x-mxr-bridge-token", TEST_AUTH_TOKEN)
2827 .send()
2828 .await
2829 .unwrap();
2830 assert_eq!(response.status(), reqwest::StatusCode::OK);
2831
2832 let json: serde_json::Value = response.json().await.unwrap();
2833 assert_eq!(json["mailbox"]["lensLabel"], "All Mail");
2834 assert_eq!(json["mailbox"]["counts"]["total"], 8124);
2835 assert!(json["sidebar"]["sections"][0]["items"]
2836 .as_array()
2837 .unwrap()
2838 .iter()
2839 .any(|item| item["label"] == "All Mail" && item["active"] == true));
2840 assert_eq!(
2841 json["mailbox"]["groups"][0]["rows"][0]["subject"],
2842 "Archive rollup"
2843 );
2844 }
2845
2846 #[tokio::test]
2847 async fn thread_endpoint_returns_messages_and_bodies() {
2848 let temp = TempDir::new().unwrap();
2849 let socket_path = temp.path().join("mxr.sock");
2850 let envelope = sample_envelope();
2851 let thread = sample_thread(&envelope);
2852 let body = sample_body(&envelope);
2853 let thread_id = thread.id.to_string();
2854 let message_id = envelope.id.to_string();
2855 let _ipc = spawn_fake_ipc_server(
2856 &socket_path,
2857 move |request| match request {
2858 Request::GetThread {
2859 thread_id: requested,
2860 } if requested == thread.id => Some(Response::Ok {
2861 data: ResponseData::Thread {
2862 thread: thread.clone(),
2863 messages: vec![envelope.clone()],
2864 },
2865 }),
2866 Request::ListBodies { message_ids }
2867 if message_ids == vec![body.message_id.clone()] =>
2868 {
2869 Some(Response::Ok {
2870 data: ResponseData::Bodies {
2871 bodies: vec![body.clone()],
2872 },
2873 })
2874 }
2875 _ => None,
2876 },
2877 None,
2878 )
2879 .await;
2880
2881 let addr = bind_and_serve(
2882 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2883 0,
2884 WebServerConfig::new(socket_path, TEST_AUTH_TOKEN.into()),
2885 )
2886 .await
2887 .unwrap();
2888
2889 let response = reqwest::Client::new()
2890 .get(format!("http://{addr}/thread/{thread_id}"))
2891 .header("x-mxr-bridge-token", TEST_AUTH_TOKEN)
2892 .send()
2893 .await
2894 .unwrap();
2895 assert_eq!(response.status(), reqwest::StatusCode::OK);
2896
2897 let json: serde_json::Value = response.json().await.unwrap();
2898 assert_eq!(json["thread"]["id"], thread_id);
2899 assert_eq!(json["messages"][0]["id"], message_id);
2900 assert_eq!(json["bodies"][0]["text_html"], "<p>rich html</p>");
2901 assert_eq!(json["reader_mode"], "reader");
2902 assert_eq!(json["right_rail"]["title"], "Thread context");
2903 }
2904
2905 #[tokio::test]
2906 async fn search_endpoint_proxies_results() {
2907 let temp = TempDir::new().unwrap();
2908 let socket_path = temp.path().join("mxr.sock");
2909 let envelope = sample_envelope();
2910 let result = SearchResultItem {
2911 message_id: envelope.id.clone(),
2912 account_id: envelope.account_id.clone(),
2913 thread_id: envelope.thread_id.clone(),
2914 score: 9.5,
2915 mode: mxr_core::types::SearchMode::Lexical,
2916 };
2917 let message_id = result.message_id.to_string();
2918 let message_ids = vec![result.message_id.clone()];
2919 let _ipc = spawn_fake_ipc_server(
2920 &socket_path,
2921 move |request| match request {
2922 Request::Search {
2923 query,
2924 limit: 200,
2925 offset: 0,
2926 mode: None,
2927 sort: Some(SortOrder::DateDesc),
2928 explain: false,
2929 } if query == "buildkite" => Some(Response::Ok {
2930 data: ResponseData::SearchResults {
2931 results: vec![result.clone()],
2932 has_more: false,
2933 explain: None,
2934 },
2935 }),
2936 Request::ListEnvelopesByIds {
2937 message_ids: requested,
2938 } if requested == message_ids => Some(Response::Ok {
2939 data: ResponseData::Envelopes {
2940 envelopes: vec![envelope.clone()],
2941 },
2942 }),
2943 _ => None,
2944 },
2945 None,
2946 )
2947 .await;
2948
2949 let addr = bind_and_serve(
2950 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
2951 0,
2952 WebServerConfig::new(socket_path, TEST_AUTH_TOKEN.into()),
2953 )
2954 .await
2955 .unwrap();
2956
2957 let response = reqwest::Client::new()
2958 .get(format!("http://{addr}/search?q=buildkite"))
2959 .header("x-mxr-bridge-token", TEST_AUTH_TOKEN)
2960 .send()
2961 .await
2962 .unwrap();
2963 assert_eq!(response.status(), reqwest::StatusCode::OK);
2964
2965 let json: serde_json::Value = response.json().await.unwrap();
2966 assert_eq!(json["total"], 1);
2967 assert_eq!(json["groups"][0]["rows"][0]["id"], message_id);
2968 assert_eq!(json["groups"][0]["rows"][0]["subject"], "Mailroom");
2969 assert!(json["explain"].is_null());
2970 }
2971
2972 #[tokio::test]
2973 async fn search_endpoint_supports_mode_sort_and_explain() {
2974 let temp = TempDir::new().unwrap();
2975 let socket_path = temp.path().join("mxr.sock");
2976 let mut older = sample_envelope();
2977 older.subject = "Older deploy".into();
2978 older.date = Utc::now() - chrono::Duration::days(1);
2979 let mut newer = sample_envelope();
2980 newer.subject = "Newest deploy".into();
2981 newer.date = Utc::now();
2982
2983 let older_result = SearchResultItem {
2984 message_id: older.id.clone(),
2985 account_id: older.account_id.clone(),
2986 thread_id: older.thread_id.clone(),
2987 score: 1.5,
2988 mode: mxr_core::types::SearchMode::Semantic,
2989 };
2990 let newer_result = SearchResultItem {
2991 message_id: newer.id.clone(),
2992 account_id: newer.account_id.clone(),
2993 thread_id: newer.thread_id.clone(),
2994 score: 0.8,
2995 mode: mxr_core::types::SearchMode::Semantic,
2996 };
2997 let requested_ids = vec![newer.id.clone(), older.id.clone()];
2998 let explain = mxr_protocol::SearchExplain {
2999 requested_mode: SearchMode::Semantic,
3000 executed_mode: SearchMode::Semantic,
3001 semantic_query: Some("deploy".into()),
3002 lexical_window: 50,
3003 dense_window: Some(50),
3004 lexical_candidates: 2,
3005 dense_candidates: 2,
3006 final_results: 2,
3007 rrf_k: Some(60),
3008 notes: vec!["semantic rerank".into()],
3009 results: vec![mxr_protocol::SearchExplainResult {
3010 rank: 1,
3011 message_id: newer.id.clone(),
3012 final_score: 1.0,
3013 lexical_rank: Some(2),
3014 lexical_score: Some(0.2),
3015 dense_rank: Some(1),
3016 dense_score: Some(0.9),
3017 }],
3018 };
3019
3020 let _ipc = spawn_fake_ipc_server(
3021 &socket_path,
3022 move |request| match request {
3023 Request::Search {
3024 query,
3025 limit: 200,
3026 offset: 0,
3027 mode: Some(SearchMode::Semantic),
3028 sort: Some(SortOrder::DateDesc),
3029 explain: true,
3030 } if query == "deploy" => Some(Response::Ok {
3031 data: ResponseData::SearchResults {
3032 results: vec![newer_result.clone(), older_result.clone()],
3033 has_more: false,
3034 explain: Some(explain.clone()),
3035 },
3036 }),
3037 Request::ListEnvelopesByIds { message_ids } if message_ids == requested_ids => {
3038 Some(Response::Ok {
3039 data: ResponseData::Envelopes {
3040 envelopes: vec![older.clone(), newer.clone()],
3041 },
3042 })
3043 }
3044 _ => None,
3045 },
3046 None,
3047 )
3048 .await;
3049
3050 let addr = bind_and_serve(
3051 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
3052 0,
3053 WebServerConfig::new(socket_path, TEST_AUTH_TOKEN.into()),
3054 )
3055 .await
3056 .unwrap();
3057
3058 let response = reqwest::Client::new()
3059 .get(format!(
3060 "http://{addr}/search?q=deploy&mode=semantic&sort=recent&explain=true"
3061 ))
3062 .header("x-mxr-bridge-token", TEST_AUTH_TOKEN)
3063 .send()
3064 .await
3065 .unwrap();
3066 assert_eq!(response.status(), reqwest::StatusCode::OK);
3067
3068 let json: serde_json::Value = response.json().await.unwrap();
3069 assert_eq!(json["mode"], "semantic");
3070 assert_eq!(json["sort"], "recent");
3071 assert_eq!(json["explain"]["requested_mode"], "semantic");
3072 assert_eq!(json["groups"][0]["rows"][0]["subject"], "Newest deploy");
3073 assert_eq!(json["groups"][1]["rows"][0]["subject"], "Older deploy");
3074 }
3075
3076 #[tokio::test]
3077 async fn search_endpoint_dedupes_threads_by_thread_id() {
3078 let temp = TempDir::new().unwrap();
3079 let socket_path = temp.path().join("mxr.sock");
3080
3081 let mut first = sample_envelope();
3082 first.subject = "First match".into();
3083 first.snippet = "first".into();
3084
3085 let mut second = sample_envelope();
3086 second.id = MessageId::new();
3087 second.subject = "Second match same thread".into();
3088 second.snippet = "second".into();
3089 second.thread_id = first.thread_id.clone();
3090
3091 let results = vec![
3092 SearchResultItem {
3093 message_id: first.id.clone(),
3094 account_id: first.account_id.clone(),
3095 thread_id: first.thread_id.clone(),
3096 score: 9.5,
3097 mode: mxr_core::types::SearchMode::Lexical,
3098 },
3099 SearchResultItem {
3100 message_id: second.id.clone(),
3101 account_id: second.account_id.clone(),
3102 thread_id: second.thread_id.clone(),
3103 score: 9.0,
3104 mode: mxr_core::types::SearchMode::Lexical,
3105 },
3106 ];
3107 let requested_ids = vec![first.id.clone()];
3108
3109 let _ipc = spawn_fake_ipc_server(
3110 &socket_path,
3111 move |request| match request {
3112 Request::Search {
3113 query,
3114 limit: 200,
3115 offset: 0,
3116 mode: None,
3117 sort: Some(SortOrder::DateDesc),
3118 explain: false,
3119 } if query == "dalumuzi" => Some(Response::Ok {
3120 data: ResponseData::SearchResults {
3121 results: results.clone(),
3122 has_more: false,
3123 explain: None,
3124 },
3125 }),
3126 Request::ListEnvelopesByIds { message_ids } if message_ids == requested_ids => {
3127 Some(Response::Ok {
3128 data: ResponseData::Envelopes {
3129 envelopes: vec![first.clone()],
3130 },
3131 })
3132 }
3133 _ => None,
3134 },
3135 None,
3136 )
3137 .await;
3138
3139 let addr = bind_and_serve(
3140 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
3141 0,
3142 WebServerConfig::new(socket_path, TEST_AUTH_TOKEN.into()),
3143 )
3144 .await
3145 .unwrap();
3146
3147 let response = reqwest::Client::new()
3148 .get(format!("http://{addr}/search?q=dalumuzi&scope=threads"))
3149 .header("x-mxr-bridge-token", TEST_AUTH_TOKEN)
3150 .send()
3151 .await
3152 .unwrap();
3153 assert_eq!(response.status(), reqwest::StatusCode::OK);
3154
3155 let json: serde_json::Value = response.json().await.unwrap();
3156 assert_eq!(json["total"], 1);
3157 assert_eq!(json["groups"][0]["rows"].as_array().unwrap().len(), 1);
3158 assert_eq!(json["groups"][0]["rows"][0]["subject"], "First match");
3159 }
3160
3161 #[tokio::test]
3162 async fn compose_session_endpoint_prepares_reply_draft() {
3163 let temp = TempDir::new().unwrap();
3164 let socket_path = temp.path().join("mxr.sock");
3165 let envelope = sample_envelope();
3166 let account = sample_account(&envelope.account_id);
3167 let message_id = envelope.id.to_string();
3168 let expected_account_id = envelope.account_id.to_string();
3169 let expected_message_id = envelope.id.clone();
3170 let _ipc = spawn_fake_ipc_server(
3171 &socket_path,
3172 move |request| match request {
3173 Request::ListAccounts => Some(Response::Ok {
3174 data: ResponseData::Accounts {
3175 accounts: vec![account.clone()],
3176 },
3177 }),
3178 Request::GetEnvelope { message_id } if message_id == expected_message_id => {
3179 Some(Response::Ok {
3180 data: ResponseData::Envelope {
3181 envelope: envelope.clone(),
3182 },
3183 })
3184 }
3185 Request::PrepareReply {
3186 message_id,
3187 reply_all: false,
3188 } if message_id == expected_message_id => Some(Response::Ok {
3189 data: ResponseData::ReplyContext {
3190 context: mxr_protocol::ReplyContext {
3191 in_reply_to: "<msg-1@example.com>".into(),
3192 references: vec!["<root@example.com>".into()],
3193 reply_to: "sender@example.com".into(),
3194 cc: String::new(),
3195 subject: "Mailroom".into(),
3196 from: "sender@example.com".into(),
3197 thread_context: "Original thread context".into(),
3198 },
3199 },
3200 }),
3201 _ => None,
3202 },
3203 None,
3204 )
3205 .await;
3206
3207 let addr = bind_and_serve(
3208 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
3209 0,
3210 WebServerConfig::new(socket_path, TEST_AUTH_TOKEN.into()),
3211 )
3212 .await
3213 .unwrap();
3214
3215 let response = reqwest::Client::new()
3216 .post(format!("http://{addr}/compose/session"))
3217 .header("x-mxr-bridge-token", TEST_AUTH_TOKEN)
3218 .json(&serde_json::json!({
3219 "kind": "reply",
3220 "message_id": message_id,
3221 }))
3222 .send()
3223 .await
3224 .unwrap();
3225 assert_eq!(response.status(), reqwest::StatusCode::OK);
3226
3227 let json: serde_json::Value = response.json().await.unwrap();
3228 assert_eq!(json["session"]["kind"], "reply");
3229 assert_eq!(json["session"]["accountId"], expected_account_id);
3230 assert_eq!(json["session"]["frontmatter"]["to"], "sender@example.com");
3231 assert_eq!(json["session"]["frontmatter"]["subject"], "Re: Mailroom");
3232 assert_eq!(json["session"]["issues"].as_array().unwrap().len(), 1);
3233 }
3234
3235 #[tokio::test]
3236 async fn archive_mutation_endpoint_proxies_message_ids() {
3237 let temp = TempDir::new().unwrap();
3238 let socket_path = temp.path().join("mxr.sock");
3239 let expected = sample_envelope();
3240 let expected_id = expected.id.to_string();
3241 let captured = std::sync::Arc::new(std::sync::Mutex::new(Vec::<String>::new()));
3242 let captured_ids = captured.clone();
3243 let _ipc = spawn_fake_ipc_server(
3244 &socket_path,
3245 move |request| match request {
3246 Request::Mutation(mxr_protocol::MutationCommand::Archive { message_ids }) => {
3247 *captured_ids.lock().unwrap() =
3248 message_ids.iter().map(ToString::to_string).collect();
3249 Some(Response::Ok {
3250 data: ResponseData::Ack,
3251 })
3252 }
3253 _ => None,
3254 },
3255 None,
3256 )
3257 .await;
3258
3259 let addr = bind_and_serve(
3260 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
3261 0,
3262 WebServerConfig::new(socket_path, TEST_AUTH_TOKEN.into()),
3263 )
3264 .await
3265 .unwrap();
3266
3267 let client = reqwest::Client::new();
3268 let response = client
3269 .post(format!("http://{addr}/mutations/archive"))
3270 .header("x-mxr-bridge-token", TEST_AUTH_TOKEN)
3271 .json(&serde_json::json!({ "message_ids": [expected_id] }))
3272 .send()
3273 .await
3274 .unwrap();
3275 assert_eq!(response.status(), reqwest::StatusCode::OK);
3276
3277 let json: serde_json::Value = response.json().await.unwrap();
3278 assert_eq!(json["ok"], true);
3279 assert_eq!(*captured.lock().unwrap(), vec![expected.id.to_string()]);
3280 }
3281
3282 #[tokio::test]
3283 async fn star_mutation_endpoint_proxies_message_ids_and_state() {
3284 let temp = TempDir::new().unwrap();
3285 let socket_path = temp.path().join("mxr.sock");
3286 let expected = sample_envelope();
3287 let expected_id = expected.id.to_string();
3288 let captured = std::sync::Arc::new(std::sync::Mutex::new((Vec::<String>::new(), false)));
3289 let captured_state = captured.clone();
3290 let _ipc = spawn_fake_ipc_server(
3291 &socket_path,
3292 move |request| match request {
3293 Request::Mutation(mxr_protocol::MutationCommand::Star {
3294 message_ids,
3295 starred,
3296 }) => {
3297 *captured_state.lock().unwrap() = (
3298 message_ids.iter().map(ToString::to_string).collect(),
3299 starred,
3300 );
3301 Some(Response::Ok {
3302 data: ResponseData::Ack,
3303 })
3304 }
3305 _ => None,
3306 },
3307 None,
3308 )
3309 .await;
3310
3311 let addr = bind_and_serve(
3312 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
3313 0,
3314 WebServerConfig::new(socket_path, TEST_AUTH_TOKEN.into()),
3315 )
3316 .await
3317 .unwrap();
3318
3319 let client = reqwest::Client::new();
3320 let response = client
3321 .post(format!("http://{addr}/mutations/star"))
3322 .header("x-mxr-bridge-token", TEST_AUTH_TOKEN)
3323 .json(&serde_json::json!({ "message_ids": [expected_id], "starred": true }))
3324 .send()
3325 .await
3326 .unwrap();
3327 assert_eq!(response.status(), reqwest::StatusCode::OK);
3328
3329 let json: serde_json::Value = response.json().await.unwrap();
3330 assert_eq!(json["ok"], true);
3331 assert_eq!(
3332 *captured.lock().unwrap(),
3333 (vec![expected.id.to_string()], true)
3334 );
3335 }
3336
3337 #[tokio::test]
3338 async fn export_thread_endpoint_proxies_markdown_export() {
3339 let temp = TempDir::new().unwrap();
3340 let socket_path = temp.path().join("mxr.sock");
3341 let envelope = sample_envelope();
3342 let thread_id = envelope.thread_id.to_string();
3343 let expected_thread_id = envelope.thread_id.clone();
3344 let _ipc = spawn_fake_ipc_server(
3345 &socket_path,
3346 move |request| match request {
3347 Request::ExportThread { thread_id, .. } if thread_id == expected_thread_id => {
3348 Some(Response::Ok {
3349 data: ResponseData::ExportResult {
3350 content: "# Exported thread".into(),
3351 },
3352 })
3353 }
3354 _ => None,
3355 },
3356 None,
3357 )
3358 .await;
3359
3360 let addr = bind_and_serve(
3361 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
3362 0,
3363 WebServerConfig::new(socket_path, TEST_AUTH_TOKEN.into()),
3364 )
3365 .await
3366 .unwrap();
3367
3368 let response = reqwest::Client::new()
3369 .get(format!("http://{addr}/thread/{thread_id}/export"))
3370 .header("x-mxr-bridge-token", TEST_AUTH_TOKEN)
3371 .send()
3372 .await
3373 .unwrap();
3374 assert_eq!(response.status(), reqwest::StatusCode::OK);
3375
3376 let json: serde_json::Value = response.json().await.unwrap();
3377 assert_eq!(json["content"], "# Exported thread");
3378 }
3379
3380 #[tokio::test]
3381 async fn bug_report_endpoint_proxies_daemon_report() {
3382 let temp = TempDir::new().unwrap();
3383 let socket_path = temp.path().join("mxr.sock");
3384 let _ipc = spawn_fake_ipc_server(
3385 &socket_path,
3386 move |request| match request {
3387 Request::GenerateBugReport {
3388 verbose: false,
3389 full_logs: false,
3390 since: None,
3391 } => Some(Response::Ok {
3392 data: ResponseData::BugReport {
3393 content: "bug report".into(),
3394 },
3395 }),
3396 _ => None,
3397 },
3398 None,
3399 )
3400 .await;
3401
3402 let addr = bind_and_serve(
3403 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
3404 0,
3405 WebServerConfig::new(socket_path, TEST_AUTH_TOKEN.into()),
3406 )
3407 .await
3408 .unwrap();
3409
3410 let response = reqwest::Client::new()
3411 .get(format!("http://{addr}/diagnostics/bug-report"))
3412 .header("x-mxr-bridge-token", TEST_AUTH_TOKEN)
3413 .send()
3414 .await
3415 .unwrap();
3416 assert_eq!(response.status(), reqwest::StatusCode::OK);
3417
3418 let json: serde_json::Value = response.json().await.unwrap();
3419 assert_eq!(json["content"], "bug report");
3420 }
3421
3422 #[tokio::test]
3423 async fn read_and_archive_endpoint_proxies_mutation() {
3424 let temp = TempDir::new().unwrap();
3425 let socket_path = temp.path().join("mxr.sock");
3426 let expected_message_id = MessageId::new();
3427 let expected_message_id_text = expected_message_id.to_string();
3428 let _ipc = spawn_fake_ipc_server(
3429 &socket_path,
3430 move |request| match request {
3431 Request::Mutation(mxr_protocol::MutationCommand::ReadAndArchive {
3432 message_ids,
3433 }) => {
3434 assert_eq!(message_ids, vec![expected_message_id.clone()]);
3435 Some(Response::Ok {
3436 data: ResponseData::Ack,
3437 })
3438 }
3439 _ => None,
3440 },
3441 None,
3442 )
3443 .await;
3444
3445 let addr = bind_and_serve(
3446 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
3447 0,
3448 WebServerConfig::new(socket_path, TEST_AUTH_TOKEN.into()),
3449 )
3450 .await
3451 .unwrap();
3452
3453 let response = reqwest::Client::new()
3454 .post(format!("http://{addr}/mutations/read-and-archive"))
3455 .header("x-mxr-bridge-token", TEST_AUTH_TOKEN)
3456 .json(&json!({
3457 "message_ids": [expected_message_id_text],
3458 }))
3459 .send()
3460 .await
3461 .unwrap();
3462 assert_eq!(response.status(), reqwest::StatusCode::OK);
3463 }
3464}