1use std::sync::Arc;
5use std::time::Duration;
6
7use chrono::Utc;
8use sqlx::SqlitePool;
9use tokio::sync::mpsc;
10use tokio_util::sync::CancellationToken;
11
12use tokio::sync::broadcast;
13
14use crate::config::CoreConfig;
15use crate::db::audit::{AuditAppend, AuditEntry};
16use crate::db::bounce_rules::BounceRule;
17use crate::db::chaos_configs::ChaosConfig;
18use crate::db::emails::{EmailDetail, EmailSummary};
19use crate::db::mailboxes::{
20 CreateEphemeralInput, CreateMailboxInput, EphemeralHandle, Mailbox, UpdateMailboxInput,
21};
22use crate::db::settings::{BackendSettings, SettingsPatch};
23use crate::db::{audit as db_audit, bounce_rules, chaos_configs, emails as db_emails,
24 mailboxes as db_mb, pool as db_pool, settings as db_settings};
25use crate::error::Result;
26use crate::events::{ChannelSink, ComposedSink, CoreEvent, EventSink, ServerStatus};
27use crate::http;
28use crate::mailbox::kinds::MailboxKind;
29use crate::mailbox::lifecycle::{self, ExpiryMsg};
30use crate::mailbox::service::MailboxService;
31use crate::pipeline::{ingest, retention};
32use crate::smtp::session::CapturedEnvelope;
33
34pub struct Service {
35 inner: Arc<Inner>,
36}
37
38#[derive(Debug)]
39struct ScanResult {
40 matched: Option<EmailDetail>,
41 seen: Vec<EmailSummary>,
42}
43
44pub(crate) struct Inner {
45 pub config: CoreConfig,
46 pub pool: SqlitePool,
47 pub mailboxes: Arc<MailboxService>,
48 pub sink: Arc<dyn EventSink>,
49 pub events: ChannelSink,
54 pub cancel: CancellationToken,
55 http_handle: parking_lot::Mutex<Option<http::HttpServerHandle>>,
56 http_restart_lock: tokio::sync::Mutex<()>,
60 log_controller: parking_lot::Mutex<Option<Arc<dyn Fn(bool) + Send + Sync>>>,
64 started: parking_lot::Mutex<bool>,
65 _ingest_task: tokio::task::JoinHandle<()>,
67 _retention_task: tokio::task::JoinHandle<()>,
68 _ttl_task: tokio::task::JoinHandle<()>,
69}
70
71impl std::fmt::Debug for Service {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 f.debug_struct("Service").finish()
74 }
75}
76
77impl Service {
78 pub async fn build(cfg: CoreConfig, sink: Arc<dyn EventSink>) -> Result<Service> {
81 cfg.ensure_dirs().await?;
82 let pool = db_pool::open(&cfg.db_path).await?;
83 crate::db::migrate::run(&pool).await?;
84
85 let cancel = CancellationToken::new();
86
87 let (ingest_tx, ingest_rx) = mpsc::channel::<CapturedEnvelope>(cfg.ingest_channel_capacity);
88 let (expiry_tx, expiry_rx) = mpsc::unbounded_channel::<ExpiryMsg>();
89
90 let events = ChannelSink::new(1024);
94 let composed: Arc<dyn EventSink> = Arc::new(ComposedSink::new(vec![
95 sink.clone(),
96 Arc::new(events.clone()),
97 ]));
98
99 let mailboxes = Arc::new(MailboxService::new(
100 pool.clone(),
101 cfg.clone(),
102 ingest_tx,
103 expiry_tx,
104 composed.clone(),
105 ));
106
107 let raw_dir = cfg.raw_dir();
108 let att_dir = cfg.att_dir();
109 let ingest_task = ingest::spawn(
110 pool.clone(),
111 composed.clone(),
112 ingest_rx,
113 raw_dir,
114 att_dir,
115 cancel.clone(),
116 );
117
118 let retention_task = retention::spawn_periodic(
119 pool.clone(),
120 cancel.clone(),
121 Duration::from_secs(3600),
122 );
123
124 let initial_expiries = db_mb::list_expiring(&pool).await?;
125 let ttl_task = lifecycle::spawn(
126 mailboxes.clone(),
127 expiry_rx,
128 cancel.clone(),
129 initial_expiries,
130 );
131
132 Ok(Service {
133 inner: Arc::new(Inner {
134 config: cfg,
135 pool,
136 mailboxes,
137 sink: composed,
138 events,
139 cancel,
140 http_handle: parking_lot::Mutex::new(None),
141 http_restart_lock: tokio::sync::Mutex::new(()),
142 log_controller: parking_lot::Mutex::new(None),
143 started: parking_lot::Mutex::new(false),
144 _ingest_task: ingest_task,
145 _retention_task: retention_task,
146 _ttl_task: ttl_task,
147 }),
148 })
149 }
150
151 pub fn subscribe(&self) -> broadcast::Receiver<CoreEvent> {
158 self.inner.events.subscribe()
159 }
160
161 pub fn set_log_level_controller(&self, ctl: Arc<dyn Fn(bool) + Send + Sync>) {
169 *self.inner.log_controller.lock() = Some(ctl);
170 }
171
172 pub async fn start_all(&self) -> Result<()> {
175 {
176 let mut s = self.inner.started.lock();
177 if *s {
178 return Ok(());
179 }
180 *s = true;
181 }
182
183 let advanced = db_settings::load_all(&self.inner.pool).await?.advanced;
188 self.inner
189 .mailboxes
190 .set_preserve_transcript(advanced.preserve_smtp_transcript);
191
192 let log_ctl = self.inner.log_controller.lock().clone();
196 if let Some(ctl) = log_ctl {
197 ctl(advanced.debug_logging);
198 }
199
200 self.inner.mailboxes.boot().await?;
201 let http = http::start(self.clone_handle()).await?;
202 *self.inner.http_handle.lock() = Some(http);
203
204 self.emit_status();
205 Ok(())
206 }
207
208 pub async fn restart_http(&self) -> Result<()> {
218 let _guard = self.inner.http_restart_lock.lock().await;
219
220 if !*self.inner.started.lock() {
221 return Ok(());
222 }
223
224 let old = self.inner.http_handle.lock().take();
225 if let Some(h) = old {
226 h.shutdown.cancel();
227 let _ = h.task.await;
228 }
229
230 let new = http::start(self.clone_handle()).await?;
231 let addr = new.addr;
232 *self.inner.http_handle.lock() = Some(new);
233 tracing::info!(target: "postcrate::http", addr = %addr, "http api restarted");
234
235 self.emit_status();
236 Ok(())
237 }
238
239 pub async fn stop_all(&self) -> Result<()> {
240 let http = self.inner.http_handle.lock().take();
244 if let Some(http) = http {
245 http.shutdown.cancel();
246 let _ = http.task.await;
247 }
248 self.inner.mailboxes.stop_all().await;
249 *self.inner.started.lock() = false;
250 self.emit_status();
251 Ok(())
252 }
253
254 pub fn status(&self) -> ServerStatus {
255 ServerStatus {
256 running_mailboxes: self.inner.mailboxes.running_count(),
257 http_running: self.inner.http_handle.lock().is_some(),
258 errors: Vec::new(),
259 }
260 }
261
262 pub fn http_addr(&self) -> Option<std::net::SocketAddr> {
264 self.inner.http_handle.lock().as_ref().map(|h| h.addr)
265 }
266
267 pub fn mailbox_addr(&self, mailbox_id: &str) -> Option<std::net::SocketAddr> {
269 self.inner.mailboxes.listener_addr(mailbox_id)
270 }
271
272 fn emit_status(&self) {
273 self.inner
274 .sink
275 .emit(CoreEvent::ServerStatusChanged { status: self.status() });
276 }
277
278 pub(crate) fn clone_handle(&self) -> ServiceHandle {
279 ServiceHandle {
280 inner: self.inner.clone(),
281 }
282 }
283
284 pub fn handle(&self) -> ServiceHandle {
285 self.clone_handle()
286 }
287
288 pub fn config(&self) -> &CoreConfig {
289 &self.inner.config
290 }
291
292 pub async fn list_mailboxes(&self, project_id: Option<&str>) -> Result<Vec<Mailbox>> {
295 db_mb::list(&self.inner.pool, project_id).await
296 }
297
298 pub async fn get_mailbox(&self, id: &str) -> Result<Mailbox> {
299 let row = db_mb::get(&self.inner.pool, id).await?;
300 let count = db_mb::count_emails(&self.inner.pool, id).await?;
301 Ok(row.with_count(count))
302 }
303
304 pub async fn create_mailbox(&self, input: CreateMailboxInput) -> Result<Mailbox> {
305 let mb = self
306 .inner
307 .mailboxes
308 .create(
309 &input.project_id,
310 &input.name,
311 input.kind,
312 input.port,
313 input.ttl_seconds,
314 input.implicit_tls,
315 )
316 .await?;
317 self.audit("user", "mailbox.create", Some("mailbox"), Some(&mb.id), None)
318 .await;
319 Ok(mb)
320 }
321
322 pub async fn update_mailbox(
323 &self,
324 id: &str,
325 patch: UpdateMailboxInput,
326 ) -> Result<Mailbox> {
327 let mb = self.inner.mailboxes.update(id, &patch).await?;
328 self.audit("user", "mailbox.update", Some("mailbox"), Some(id), None)
329 .await;
330 Ok(mb)
331 }
332
333 pub async fn delete_mailbox(&self, id: &str) -> Result<()> {
334 self.inner.mailboxes.delete(id).await?;
335 self.audit("user", "mailbox.delete", Some("mailbox"), Some(id), None)
336 .await;
337 Ok(())
338 }
339
340 pub async fn suggest_mailbox_port(&self, start: Option<u16>) -> Result<u16> {
351 use std::collections::HashSet;
352
353 let taken: HashSet<u16> = db_mb::list_all_ports(&self.inner.pool)
354 .await?
355 .into_iter()
356 .collect();
357 let host = self.inner.config.bind_host.as_ip();
358 let start = start.unwrap_or(1025);
359 crate::mailbox::ports::find_free_port(start, host, &taken).await
360 }
361
362 pub async fn start_mailbox(&self, id: &str) -> Result<()> {
367 db_mb::set_paused(&self.inner.pool, id, false).await?;
373 self.inner.mailboxes.start(id).await?;
374 self.audit("user", "mailbox.start", Some("mailbox"), Some(id), None)
375 .await;
376 Ok(())
377 }
378
379 pub async fn stop_mailbox(&self, id: &str) -> Result<()> {
382 db_mb::set_paused(&self.inner.pool, id, true).await?;
383 self.inner.mailboxes.stop(id).await?;
384 self.audit("user", "mailbox.stop", Some("mailbox"), Some(id), None)
385 .await;
386 Ok(())
387 }
388
389 pub async fn create_ephemeral(
390 &self,
391 input: CreateEphemeralInput,
392 ) -> Result<EphemeralHandle> {
393 let name = input.name.unwrap_or_else(|| format!("eph-{}", short_id()));
394 let mb = self
395 .inner
396 .mailboxes
397 .create(
398 &input.project_id,
399 &name,
400 MailboxKind::Ephemeral,
401 None,
402 Some(input.ttl_seconds),
403 false,
404 )
405 .await?;
406 let addr = self.inner.mailboxes.listener_addr(&mb.id);
407 let host = addr
408 .map(|a| a.ip().to_string())
409 .unwrap_or_else(|| self.inner.config.bind_host.as_ip().to_string());
410 let port = addr.map_or(mb.port, |a| a.port());
411 let expires_at = mb.expires_at.unwrap_or_else(|| {
412 Utc::now().timestamp_millis() + (input.ttl_seconds as i64 * 1000)
413 });
414 self.audit(
415 "user",
416 "mailbox.ephemeral.create",
417 Some("mailbox"),
418 Some(&mb.id),
419 None,
420 )
421 .await;
422 Ok(EphemeralHandle {
423 id: mb.id,
424 host,
425 port,
426 expires_at,
427 })
428 }
429
430 pub async fn list_emails(
433 &self,
434 mailbox_id: &str,
435 limit: u32,
436 offset: u32,
437 ) -> Result<Vec<EmailSummary>> {
438 db_emails::list(&self.inner.pool, mailbox_id, limit, offset).await
439 }
440
441 pub async fn get_email(&self, id: &str) -> Result<EmailDetail> {
442 db_emails::get_detail(&self.inner.pool, id).await
443 }
444
445 pub async fn get_email_raw(&self, id: &str) -> Result<Vec<u8>> {
446 let path = db_emails::get_raw_path(&self.inner.pool, id).await?;
447 Ok(tokio::fs::read(&path).await?)
448 }
449
450 pub async fn get_email_smtp_transcript(&self, id: &str) -> Result<Option<String>> {
454 let path = db_emails::get_raw_path(&self.inner.pool, id).await?;
455 let transcript_path =
456 crate::pipeline::ingest::transcript_path_for(std::path::Path::new(&path));
457 match tokio::fs::read_to_string(&transcript_path).await {
458 Ok(s) => Ok(Some(s)),
459 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
460 Err(e) => Err(e.into()),
461 }
462 }
463
464 pub async fn delete_email(&self, id: &str) -> Result<()> {
465 let raw_path = db_emails::delete(&self.inner.pool, id).await?;
466 delete_email_artifacts(&raw_path).await;
467 self.audit("user", "email.delete", Some("email"), Some(id), None).await;
468 Ok(())
469 }
470
471 pub async fn clear_mailbox(&self, mailbox_id: &str) -> Result<u64> {
475 let (n, paths) = db_emails::clear_mailbox(&self.inner.pool, mailbox_id, true).await?;
476 for p in &paths {
477 delete_email_artifacts(p).await;
478 }
479 self.audit(
480 "user",
481 "mailbox.clear",
482 Some("mailbox"),
483 Some(mailbox_id),
484 Some(serde_json::json!({"deleted": n})),
485 )
486 .await;
487 Ok(n)
488 }
489
490 pub async fn purge_mailbox(&self, mailbox_id: &str) -> Result<u64> {
493 let (n, paths) = db_emails::clear_mailbox(&self.inner.pool, mailbox_id, false).await?;
494 for p in &paths {
495 delete_email_artifacts(p).await;
496 }
497 self.audit(
498 "user",
499 "mailbox.purge",
500 Some("mailbox"),
501 Some(mailbox_id),
502 Some(serde_json::json!({"deleted": n})),
503 )
504 .await;
505 Ok(n)
506 }
507
508 pub async fn set_pinned(&self, id: &str, pinned: bool) -> Result<()> {
509 db_emails::set_pinned(&self.inner.pool, id, pinned).await?;
510 self.audit(
511 "user",
512 if pinned { "email.pin" } else { "email.unpin" },
513 Some("email"),
514 Some(id),
515 None,
516 )
517 .await;
518 Ok(())
519 }
520
521 pub async fn set_starred(&self, id: &str, starred: bool) -> Result<()> {
522 db_emails::set_starred(&self.inner.pool, id, starred).await?;
523 self.audit(
524 "user",
525 if starred { "email.star" } else { "email.unstar" },
526 Some("email"),
527 Some(id),
528 None,
529 )
530 .await;
531 Ok(())
532 }
533
534 pub async fn set_note(&self, id: &str, note: Option<&str>) -> Result<()> {
535 db_emails::set_note(&self.inner.pool, id, note).await?;
536 self.audit("user", "email.note", Some("email"), Some(id), None).await;
537 Ok(())
538 }
539
540 pub async fn set_tag(&self, id: &str, tag: Option<&str>) -> Result<()> {
544 db_emails::set_tag(&self.inner.pool, id, tag).await?;
545 self.audit("user", "email.tag", Some("email"), Some(id), None).await;
546 Ok(())
547 }
548
549 pub async fn release_email(
558 &self,
559 id: &str,
560 to: &str,
561 relay: &crate::RelayConfig,
562 ) -> Result<()> {
563 let detail = self.get_email(id).await?;
564 let raw = self.get_email_raw(id).await?;
565 let from = if detail.from.is_empty() {
566 "postcrate@localhost".to_string()
567 } else {
568 detail.from.clone()
569 };
570 crate::smtp::relay::relay_message(relay, &from, &[to.to_string()], &raw).await?;
571 self.audit(
572 "user",
573 "email.release",
574 Some("email"),
575 Some(id),
576 Some(serde_json::json!({
577 "to": to,
578 "relay": format!("{}:{}", relay.host, relay.port),
579 })),
580 )
581 .await;
582 Ok(())
583 }
584
585 pub async fn search_emails(
586 &self,
587 q: &str,
588 mailbox_id: Option<&str>,
589 limit: u32,
590 ) -> Result<Vec<EmailSummary>> {
591 db_emails::search(&self.inner.pool, q, mailbox_id, limit).await
592 }
593
594 pub async fn mark_read(&self, id: &str, read: bool) -> Result<()> {
595 db_emails::mark_read(&self.inner.pool, id, read).await
596 }
597
598 pub async fn get_attachment_blob(
601 &self,
602 attachment_id: &str,
603 ) -> Result<(Vec<u8>, Option<String>, Option<String>)> {
604 let (path, name, ct) =
605 crate::db::attachments::get_blob_path(&self.inner.pool, attachment_id).await?;
606 let bytes = tokio::fs::read(&path).await?;
607 Ok((bytes, name, ct))
608 }
609
610 pub async fn get_chaos(&self, mailbox_id: &str) -> Result<ChaosConfig> {
613 let _ = db_mb::get(&self.inner.pool, mailbox_id).await?;
615 chaos_configs::get(&self.inner.pool, mailbox_id).await
616 }
617
618 pub async fn set_chaos(&self, mailbox_id: &str, cfg: ChaosConfig) -> Result<()> {
619 let _ = db_mb::get(&self.inner.pool, mailbox_id).await?;
620 chaos_configs::upsert(&self.inner.pool, mailbox_id, &cfg).await?;
621 self.inner.mailboxes.refresh_chaos(mailbox_id).await?;
622 self.audit(
623 "user",
624 "chaos.update",
625 Some("mailbox"),
626 Some(mailbox_id),
627 Some(serde_json::to_value(&cfg)?),
628 )
629 .await;
630 Ok(())
631 }
632
633 pub async fn list_bounce_rules(&self, mailbox_id: &str) -> Result<Vec<BounceRule>> {
636 let _ = db_mb::get(&self.inner.pool, mailbox_id).await?;
637 bounce_rules::list(&self.inner.pool, mailbox_id).await
638 }
639
640 pub async fn upsert_bounce_rule(&self, rule: BounceRule) -> Result<BounceRule> {
641 let _ = db_mb::get(&self.inner.pool, &rule.mailbox_id).await?;
642 let saved = bounce_rules::upsert(&self.inner.pool, rule).await?;
643 self.inner.mailboxes.refresh_bounce(&saved.mailbox_id).await?;
644 self.audit(
645 "user",
646 "bounce.upsert",
647 Some("mailbox"),
648 Some(&saved.mailbox_id),
649 Some(serde_json::to_value(&saved)?),
650 )
651 .await;
652 Ok(saved)
653 }
654
655 pub async fn delete_bounce_rule(&self, id: &str) -> Result<()> {
656 bounce_rules::delete(&self.inner.pool, id).await?;
657 self.audit("user", "bounce.delete", Some("bounce_rule"), Some(id), None).await;
658 Ok(())
659 }
660
661 pub async fn get_settings(&self) -> Result<BackendSettings> {
664 db_settings::load_all(&self.inner.pool).await
665 }
666
667 pub async fn update_settings(&self, patch: SettingsPatch) -> Result<()> {
668 let section = patch.section();
669
670 let needs_http_restart = if let SettingsPatch::Network(next) = &patch {
675 let prev = db_settings::load_all(&self.inner.pool).await?.network;
676 prev.http_api_port != next.http_api_port
677 || prev.api_auth_token != next.api_auth_token
678 || prev.expose_on_lan != next.expose_on_lan
679 || prev.api_tls != next.api_tls
680 } else {
681 false
682 };
683
684 let mut new_debug_logging = None;
689 let mut new_preserve_transcript = None;
690 if let SettingsPatch::Advanced(next) = &patch {
691 let prev = db_settings::load_all(&self.inner.pool).await?.advanced;
692 if prev.debug_logging != next.debug_logging {
693 new_debug_logging = Some(next.debug_logging);
694 }
695 if prev.preserve_smtp_transcript != next.preserve_smtp_transcript {
696 new_preserve_transcript = Some(next.preserve_smtp_transcript);
697 }
698 }
699
700 db_settings::apply_patch(&self.inner.pool, &patch).await?;
701 self.inner.sink.emit(CoreEvent::SettingsChanged { section });
702
703 if needs_http_restart {
704 self.restart_http().await?;
705 }
706
707 if let Some(debug) = new_debug_logging {
708 let ctl = self.inner.log_controller.lock().clone();
709 if let Some(ctl) = ctl {
710 ctl(debug);
711 }
712 }
713
714 if let Some(enabled) = new_preserve_transcript {
715 self.inner.mailboxes.set_preserve_transcript(enabled);
716 }
717
718 Ok(())
719 }
720
721 pub async fn analyze_spam(
726 &self,
727 id: &str,
728 ) -> Result<crate::scenarios::spam::SpamReport> {
729 let parsed = self.parsed_email(id).await?;
730 Ok(crate::scenarios::spam::score(&parsed))
731 }
732
733 pub async fn analyze_links(
736 &self,
737 id: &str,
738 ) -> Result<crate::scenarios::links::LinkReport> {
739 let parsed = self.parsed_email(id).await?;
740 Ok(crate::scenarios::links::extract(&parsed))
741 }
742
743 pub async fn analyze_auth(
746 &self,
747 id: &str,
748 ) -> Result<crate::scenarios::auth::AuthReport> {
749 let parsed = self.parsed_email(id).await?;
750 Ok(crate::scenarios::auth::analyze(&parsed))
751 }
752
753 pub async fn analyze_list_unsub(
756 &self,
757 id: &str,
758 ) -> Result<crate::scenarios::list_unsub::UnsubReport> {
759 let parsed = self.parsed_email(id).await?;
760 Ok(crate::scenarios::list_unsub::analyze(&parsed))
761 }
762
763 async fn parsed_email(&self, id: &str) -> Result<crate::mail::parse::Parsed> {
768 let raw = self.get_email_raw(id).await?;
769 Ok(crate::mail::parse::parse(&raw))
770 }
771
772 pub async fn render_preview(
778 &self,
779 id: &str,
780 profile: crate::rendering::profile::Profile,
781 ) -> Result<crate::rendering::profile::RenderedPreview> {
782 let detail = self.get_email(id).await?;
783 let html = detail.html_body.unwrap_or_default();
784 Ok(crate::rendering::profile::apply(&html, profile))
785 }
786
787 pub async fn lint_html(&self, id: &str) -> Result<crate::rendering::lint::LintReport> {
789 let detail = self.get_email(id).await?;
790 let html = detail.html_body.unwrap_or_default();
791 Ok(crate::rendering::lint::lint(&html))
792 }
793
794 pub async fn audit_a11y(&self, id: &str) -> Result<crate::rendering::a11y::A11yReport> {
796 let detail = self.get_email(id).await?;
797 let html = detail.html_body.unwrap_or_default();
798 Ok(crate::rendering::a11y::audit(&html))
799 }
800
801 pub async fn export_recording(
807 &self,
808 mailbox_id: &str,
809 label: Option<String>,
810 ) -> Result<crate::recording::Recording> {
811 let _ = db_mb::get(&self.inner.pool, mailbox_id).await?;
813 let summaries = db_emails::list(&self.inner.pool, mailbox_id, u32::MAX, 0).await?;
814 let mut messages = Vec::with_capacity(summaries.len());
815 let mut summaries = summaries;
818 summaries.sort_by_key(|s| s.received_at);
819 for s in summaries {
820 let raw = self.get_email_raw(&s.id).await?;
821 let detail = self.get_email(&s.id).await?;
822 messages.push(crate::recording::RecordedMessage {
823 envelope: crate::recording::RecordedEnvelope {
824 mail_from: detail.from.clone(),
825 rcpt_to: detail.to.clone(),
826 received_at: detail.received_at,
827 ext_smtputf8: detail.ext_smtputf8,
828 ext_8bitmime: detail.ext_8bitmime,
829 subject: detail.subject.clone(),
830 },
831 raw_b64: crate::recording::encode_raw(&raw),
832 });
833 }
834 Ok(crate::recording::Recording {
835 version: crate::recording::RECORDING_VERSION,
836 exported_at: chrono::Utc::now().timestamp_millis(),
837 label,
838 messages,
839 })
840 }
841
842 pub async fn replay_recording(
848 &self,
849 mailbox_id: &str,
850 recording: &crate::recording::Recording,
851 ) -> Result<u64> {
852 recording.validate()?;
853 let _ = db_mb::get(&self.inner.pool, mailbox_id).await?;
854 let mailbox_id_owned = mailbox_id.to_string();
855 let ingest_tx = self.inner.mailboxes.ingest_tx();
856 let incoming_dir = self.inner.config.incoming_dir();
857 tokio::fs::create_dir_all(&incoming_dir).await?;
858
859 let mut count: u64 = 0;
860 for msg in &recording.messages {
861 let raw = crate::recording::decode_raw(msg)?;
862 let tmp = incoming_dir.join(format!("{}.tmp", uuid::Uuid::new_v4()));
867 tokio::fs::write(&tmp, &raw).await?;
868 let size = raw.len() as u64;
869 let env = crate::smtp::session::CapturedEnvelope {
870 mailbox_id: mailbox_id_owned.clone(),
871 received_at: msg.envelope.received_at,
872 mail_from: msg.envelope.mail_from.clone(),
873 rcpt_to: msg.envelope.rcpt_to.clone(),
874 raw: crate::smtp::data_reader::CapturedSource::OnDisk(tmp, size),
875 ext_smtputf8: msg.envelope.ext_smtputf8,
876 ext_8bitmime: msg.envelope.ext_8bitmime,
877 transcript: None,
881 };
882 ingest_tx
883 .send(env)
884 .await
885 .map_err(|e| crate::error::Error::Internal(format!("ingest closed: {e}")))?;
886 count += 1;
887 }
888 self.audit(
889 "user",
890 "recording.replay",
891 Some("mailbox"),
892 Some(mailbox_id),
893 Some(serde_json::json!({"count": count})),
894 )
895 .await;
896 Ok(count)
897 }
898
899 pub async fn replay_email(&self, id: &str, target_mailbox_id: &str) -> Result<()> {
903 let detail = self.get_email(id).await?;
904 let raw = self.get_email_raw(id).await?;
905 let addr = self
906 .inner
907 .mailboxes
908 .listener_addr(target_mailbox_id)
909 .ok_or_else(|| crate::error::Error::MailboxNotFound(target_mailbox_id.into()))?;
910 let from = if detail.from.is_empty() {
911 "postcrate@localhost".to_string()
912 } else {
913 detail.from.clone()
914 };
915 let rcpts = if detail.to.is_empty() {
916 vec!["postcrate@localhost".to_string()]
917 } else {
918 detail.to.clone()
919 };
920 crate::smtp::relay::relay_message(
921 &crate::RelayConfig {
922 host: addr.ip().to_string(),
923 port: addr.port(),
924 timeout_seconds: Some(10),
925 allowed_recipients: None,
926 },
927 &from,
928 &rcpts,
929 &raw,
930 )
931 .await?;
932 self.audit(
933 "user",
934 "email.replay",
935 Some("email"),
936 Some(id),
937 Some(serde_json::json!({"targetMailbox": target_mailbox_id})),
938 )
939 .await;
940 Ok(())
941 }
942
943 pub async fn wait_for_email(
958 &self,
959 predicate: crate::matcher::EmailPredicate,
960 timeout: std::time::Duration,
961 ) -> Result<crate::matcher::WaitOutcome> {
962 use crate::events::CoreEvent;
963 use tokio::sync::broadcast::error::RecvError;
964 use tokio::time::Instant;
965
966 let mut rx = self.subscribe();
967 let mut seen: Vec<EmailSummary> = Vec::new();
968
969 let initial = self.scan_for_match(&predicate, 100).await?;
971 if let Some(d) = initial.matched {
972 return Ok(crate::matcher::WaitOutcome {
973 matched: Some(d),
974 seen_during_wait: initial.seen,
975 });
976 }
977 seen.extend(initial.seen);
978
979 let deadline = Instant::now() + timeout;
980 loop {
981 let remaining = deadline.saturating_duration_since(Instant::now());
982 if remaining.is_zero() {
983 return Ok(crate::matcher::WaitOutcome {
984 matched: None,
985 seen_during_wait: seen,
986 });
987 }
988 match tokio::time::timeout(remaining, rx.recv()).await {
989 Err(_) => {
990 return Ok(crate::matcher::WaitOutcome {
991 matched: None,
992 seen_during_wait: seen,
993 });
994 }
995 Ok(Err(RecvError::Closed)) => {
996 return Ok(crate::matcher::WaitOutcome {
997 matched: None,
998 seen_during_wait: seen,
999 });
1000 }
1001 Ok(Err(RecvError::Lagged(_))) => {
1002 let catch = self.scan_for_match(&predicate, 100).await?;
1004 if catch.matched.is_some() {
1005 return Ok(crate::matcher::WaitOutcome {
1006 matched: catch.matched,
1007 seen_during_wait: seen,
1008 });
1009 }
1010 continue;
1011 }
1012 Ok(Ok(CoreEvent::NewEmail { mailbox_id, email })) => {
1013 if predicate.mailbox_id.as_ref().is_some_and(|m| m != &mailbox_id) {
1014 continue;
1015 }
1016 if predicate.matches_summary(&email) {
1017 let detail = self.get_email(&email.id).await?;
1018 if predicate.check(&detail).matched {
1019 return Ok(crate::matcher::WaitOutcome {
1020 matched: Some(detail),
1021 seen_during_wait: seen,
1022 });
1023 }
1024 }
1025 seen.push(email);
1026 }
1027 Ok(Ok(_)) => continue,
1028 }
1029 }
1030 }
1031
1032 pub async fn assert_email_matches(
1036 &self,
1037 id: &str,
1038 predicate: &crate::matcher::EmailPredicate,
1039 ) -> Result<crate::matcher::MatchResult> {
1040 let detail = self.get_email(id).await?;
1041 Ok(predicate.check(&detail))
1042 }
1043
1044 async fn scan_for_match(
1049 &self,
1050 predicate: &crate::matcher::EmailPredicate,
1051 limit: u32,
1052 ) -> Result<ScanResult> {
1053 let summaries = match &predicate.mailbox_id {
1054 Some(mb) => db_emails::list(&self.inner.pool, mb, limit, 0).await?,
1055 None => db_emails::list_recent_across(&self.inner.pool, limit).await?,
1056 };
1057 let mut seen = Vec::new();
1058 for s in summaries {
1059 if !predicate.matches_summary(&s) {
1060 seen.push(s);
1061 continue;
1062 }
1063 let detail = self.get_email(&s.id).await?;
1064 if predicate.check(&detail).matched {
1065 return Ok(ScanResult { matched: Some(detail), seen });
1066 }
1067 seen.push(s);
1068 }
1069 Ok(ScanResult { matched: None, seen })
1070 }
1071
1072 pub async fn list_webhooks(&self) -> Result<Vec<crate::db::webhooks::Webhook>> {
1075 crate::db::webhooks::list(&self.inner.pool).await
1076 }
1077
1078 pub async fn create_webhook(
1079 &self,
1080 input: crate::db::webhooks::CreateWebhook,
1081 ) -> Result<crate::db::webhooks::Webhook> {
1082 let hook = crate::db::webhooks::insert(&self.inner.pool, input).await?;
1083 self.audit(
1084 "user",
1085 "webhook.create",
1086 Some("webhook"),
1087 Some(&hook.id),
1088 None,
1089 )
1090 .await;
1091 Ok(hook)
1092 }
1093
1094 pub async fn delete_webhook(&self, id: &str) -> Result<()> {
1095 crate::db::webhooks::delete(&self.inner.pool, id).await?;
1096 self.audit("user", "webhook.delete", Some("webhook"), Some(id), None).await;
1097 Ok(())
1098 }
1099
1100 pub async fn list_forwarding_rules(
1103 &self,
1104 ) -> Result<Vec<crate::db::forwarding::ForwardingRule>> {
1105 crate::db::forwarding::list(&self.inner.pool).await
1106 }
1107
1108 pub async fn create_forwarding_rule(
1109 &self,
1110 input: crate::db::forwarding::CreateForwardingRule,
1111 ) -> Result<crate::db::forwarding::ForwardingRule> {
1112 let rule = crate::db::forwarding::insert(&self.inner.pool, input).await?;
1113 self.audit(
1114 "user",
1115 "forwarding.create",
1116 Some("forwarding_rule"),
1117 Some(&rule.id),
1118 None,
1119 )
1120 .await;
1121 Ok(rule)
1122 }
1123
1124 pub async fn delete_forwarding_rule(&self, id: &str) -> Result<()> {
1125 crate::db::forwarding::delete(&self.inner.pool, id).await?;
1126 self.audit(
1127 "user",
1128 "forwarding.delete",
1129 Some("forwarding_rule"),
1130 Some(id),
1131 None,
1132 )
1133 .await;
1134 Ok(())
1135 }
1136
1137 pub async fn list_audit(&self, limit: u32, offset: u32) -> Result<Vec<AuditEntry>> {
1140 db_audit::list(&self.inner.pool, limit, offset).await
1141 }
1142
1143 pub async fn clear_audit(&self, older_than_days: Option<u32>) -> Result<u64> {
1144 match older_than_days {
1145 Some(days) => db_audit::prune_older_than(&self.inner.pool, days).await,
1146 None => db_audit::clear_all(&self.inner.pool).await,
1147 }
1148 }
1149
1150 async fn audit(
1153 &self,
1154 actor: &str,
1155 action: &str,
1156 target_kind: Option<&str>,
1157 target_id: Option<&str>,
1158 metadata: Option<serde_json::Value>,
1159 ) {
1160 let res = db_audit::append(
1161 &self.inner.pool,
1162 AuditAppend {
1163 actor: actor.to_string(),
1164 action: action.to_string(),
1165 target_kind: target_kind.map(str::to_string),
1166 target_id: target_id.map(str::to_string),
1167 metadata,
1168 },
1169 )
1170 .await;
1171 if let Ok(entry) = res {
1172 self.inner
1173 .sink
1174 .emit(CoreEvent::AuditAppended { entry });
1175 }
1176 }
1177}
1178
1179#[derive(Clone)]
1181pub struct ServiceHandle {
1182 pub(crate) inner: Arc<Inner>,
1183}
1184
1185impl ServiceHandle {
1186 pub fn pool(&self) -> &SqlitePool {
1187 &self.inner.pool
1188 }
1189
1190 pub fn mailboxes(&self) -> &MailboxService {
1191 &self.inner.mailboxes
1192 }
1193
1194 pub fn config(&self) -> &CoreConfig {
1195 &self.inner.config
1196 }
1197
1198 pub fn sink(&self) -> &Arc<dyn EventSink> {
1199 &self.inner.sink
1200 }
1201
1202 pub fn as_service(&self) -> Service {
1203 Service {
1204 inner: self.inner.clone(),
1205 }
1206 }
1207}
1208
1209impl std::fmt::Debug for ServiceHandle {
1210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1211 f.debug_struct("ServiceHandle").finish()
1212 }
1213}
1214
1215fn short_id() -> String {
1216 use rand::distributions::{Alphanumeric, DistString};
1217 Alphanumeric.sample_string(&mut rand::thread_rng(), 6).to_lowercase()
1218}
1219
1220async fn delete_email_artifacts(raw_path: &str) {
1227 let _ = tokio::fs::remove_file(raw_path).await;
1228 let _ = tokio::fs::remove_file(crate::pipeline::ingest::transcript_path_for(
1229 std::path::Path::new(raw_path),
1230 ))
1231 .await;
1232}