Skip to main content

postcrate_core/
service.rs

1//! The single public façade. The built-in HTTP routes, downstream
2//! command shims, and CLI subcommands all speak only to this type.
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use chrono::Utc;
8use sqlx::SqlitePool;
9use tokio::sync::mpsc;
10use tokio_util::sync::CancellationToken;
11
12use tokio::sync::broadcast;
13
14use crate::config::CoreConfig;
15use crate::db::audit::{AuditAppend, AuditEntry};
16use crate::db::bounce_rules::BounceRule;
17use crate::db::chaos_configs::ChaosConfig;
18use crate::db::emails::{EmailDetail, EmailSummary};
19use crate::db::mailboxes::{
20    CreateEphemeralInput, CreateMailboxInput, EphemeralHandle, Mailbox, UpdateMailboxInput,
21};
22use crate::db::settings::{BackendSettings, SettingsPatch};
23use crate::db::{audit as db_audit, bounce_rules, chaos_configs, emails as db_emails,
24                mailboxes as db_mb, pool as db_pool, settings as db_settings};
25use crate::error::Result;
26use crate::events::{ChannelSink, ComposedSink, CoreEvent, EventSink, ServerStatus};
27use crate::http;
28use crate::mailbox::kinds::MailboxKind;
29use crate::mailbox::lifecycle::{self, ExpiryMsg};
30use crate::mailbox::service::MailboxService;
31use crate::pipeline::{ingest, retention};
32use crate::smtp::session::CapturedEnvelope;
33
34pub struct Service {
35    inner: Arc<Inner>,
36}
37
38#[derive(Debug)]
39struct ScanResult {
40    matched: Option<EmailDetail>,
41    seen: Vec<EmailSummary>,
42}
43
44pub(crate) struct Inner {
45    pub config: CoreConfig,
46    pub pool: SqlitePool,
47    pub mailboxes: Arc<MailboxService>,
48    pub sink: Arc<dyn EventSink>,
49    /// In-process fan-out for `Service::subscribe`. Wrapped under the
50    /// user-provided sink via `ComposedSink` so every emission reaches
51    /// both the embedder's sink and any in-process `subscribe()`
52    /// consumers (CLI tail, SSE endpoint, `wait_for_email`).
53    pub events: ChannelSink,
54    pub cancel: CancellationToken,
55    http_handle: parking_lot::Mutex<Option<http::HttpServerHandle>>,
56    /// Serializes `restart_http` so two concurrent network-pref updates
57    /// can't race and orphan a listener. Held across the shutdown +
58    /// rebind cycle, which is why it's tokio rather than parking_lot.
59    http_restart_lock: tokio::sync::Mutex<()>,
60    /// Hook the embedder installs to flip the global tracing filter
61    /// when `AdvancedPrefs.debug_logging` changes. The engine doesn't
62    /// own the subscriber stack — it just signals the level it wants.
63    log_controller: parking_lot::Mutex<Option<Arc<dyn Fn(bool) + Send + Sync>>>,
64    started: parking_lot::Mutex<bool>,
65    /// Hold these so they're cancelled with the service.
66    _ingest_task: tokio::task::JoinHandle<()>,
67    _retention_task: tokio::task::JoinHandle<()>,
68    _ttl_task: tokio::task::JoinHandle<()>,
69}
70
71impl std::fmt::Debug for Service {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        f.debug_struct("Service").finish()
74    }
75}
76
77impl Service {
78    /// Build the engine: open the DB, migrate, spawn workers, prepare
79    /// listeners. Doesn't bind any sockets — call [`Service::start_all`].
80    pub async fn build(cfg: CoreConfig, sink: Arc<dyn EventSink>) -> Result<Service> {
81        cfg.ensure_dirs().await?;
82        let pool = db_pool::open(&cfg.db_path).await?;
83        crate::db::migrate::run(&pool).await?;
84
85        let cancel = CancellationToken::new();
86
87        let (ingest_tx, ingest_rx) = mpsc::channel::<CapturedEnvelope>(cfg.ingest_channel_capacity);
88        let (expiry_tx, expiry_rx) = mpsc::unbounded_channel::<ExpiryMsg>();
89
90        // Build a composed sink: the user's sink + our internal channel
91        // sink. Every `emit` reaches both. Subscribers (CLI tail, SSE,
92        // wait_for_email) read from `events`.
93        let events = ChannelSink::new(1024);
94        let composed: Arc<dyn EventSink> = Arc::new(ComposedSink::new(vec![
95            sink.clone(),
96            Arc::new(events.clone()),
97        ]));
98
99        let mailboxes = Arc::new(MailboxService::new(
100            pool.clone(),
101            cfg.clone(),
102            ingest_tx,
103            expiry_tx,
104            composed.clone(),
105        ));
106
107        let raw_dir = cfg.raw_dir();
108        let att_dir = cfg.att_dir();
109        let ingest_task = ingest::spawn(
110            pool.clone(),
111            composed.clone(),
112            ingest_rx,
113            raw_dir,
114            att_dir,
115            cancel.clone(),
116        );
117
118        let retention_task = retention::spawn_periodic(
119            pool.clone(),
120            cancel.clone(),
121            Duration::from_secs(3600),
122        );
123
124        let initial_expiries = db_mb::list_expiring(&pool).await?;
125        let ttl_task = lifecycle::spawn(
126            mailboxes.clone(),
127            expiry_rx,
128            cancel.clone(),
129            initial_expiries,
130        );
131
132        Ok(Service {
133            inner: Arc::new(Inner {
134                config: cfg,
135                pool,
136                mailboxes,
137                sink: composed,
138                events,
139                cancel,
140                http_handle: parking_lot::Mutex::new(None),
141                http_restart_lock: tokio::sync::Mutex::new(()),
142                log_controller: parking_lot::Mutex::new(None),
143                started: parking_lot::Mutex::new(false),
144                _ingest_task: ingest_task,
145                _retention_task: retention_task,
146                _ttl_task: ttl_task,
147            }),
148        })
149    }
150
151    /// Subscribe to engine events. Each call returns a fresh
152    /// `broadcast::Receiver`; consumers that lag behind by more than
153    /// the channel capacity (currently 1024) will receive `Lagged`
154    /// errors and must reconnect. This is the canonical way for the
155    /// CLI `tail`, the SSE endpoint, the `wait_for_email` primitive,
156    /// and external consumers to observe events.
157    pub fn subscribe(&self) -> broadcast::Receiver<CoreEvent> {
158        self.inner.events.subscribe()
159    }
160
161    /// Install a callback the engine invokes when the persisted
162    /// `AdvancedPrefs.debug_logging` value changes (and once at
163    /// `start_all` to honor the initial value). The embedder owns the
164    /// `tracing_subscriber` stack; this lets the engine drive a filter
165    /// reload without depending on the subscriber implementation.
166    ///
167    /// Idempotent — calling it again replaces the previous controller.
168    pub fn set_log_level_controller(&self, ctl: Arc<dyn Fn(bool) + Send + Sync>) {
169        *self.inner.log_controller.lock() = Some(ctl);
170    }
171
172    /// Start every persisted mailbox's listener + the HTTP API.
173    /// Idempotent.
174    pub async fn start_all(&self) -> Result<()> {
175        {
176            let mut s = self.inner.started.lock();
177            if *s {
178                return Ok(());
179            }
180            *s = true;
181        }
182
183        // Apply persisted Advanced prefs *before* listeners spin up so
184        // the very first SMTP session and the very first log line
185        // honor the user's choices. Both `mailboxes.boot()` and
186        // `http::start()` are wired to read these on the fly.
187        let advanced = db_settings::load_all(&self.inner.pool).await?.advanced;
188        self.inner
189            .mailboxes
190            .set_preserve_transcript(advanced.preserve_smtp_transcript);
191
192        // Clone the log controller out of the mutex before any `.await`
193        // so the parking_lot guard isn't held across the suspension
194        // point (which would make the future `!Send`).
195        let log_ctl = self.inner.log_controller.lock().clone();
196        if let Some(ctl) = log_ctl {
197            ctl(advanced.debug_logging);
198        }
199
200        self.inner.mailboxes.boot().await?;
201        let http = http::start(self.clone_handle()).await?;
202        *self.inner.http_handle.lock() = Some(http);
203
204        self.emit_status();
205        Ok(())
206    }
207
208    /// Tear down the running HTTP listener (if any) and start a fresh
209    /// one with whatever's currently persisted in `BackendSettings`.
210    /// Used by `update_settings` to apply network-pref changes live
211    /// without requiring an app restart.
212    ///
213    /// No-op when the service hasn't reached `start_all` yet — the
214    /// boot path will pick up the new settings when it gets there. If
215    /// the rebind fails, the previous listener stays torn down and the
216    /// error is propagated so the caller can revert the patch.
217    pub async fn restart_http(&self) -> Result<()> {
218        let _guard = self.inner.http_restart_lock.lock().await;
219
220        if !*self.inner.started.lock() {
221            return Ok(());
222        }
223
224        let old = self.inner.http_handle.lock().take();
225        if let Some(h) = old {
226            h.shutdown.cancel();
227            let _ = h.task.await;
228        }
229
230        let new = http::start(self.clone_handle()).await?;
231        let addr = new.addr;
232        *self.inner.http_handle.lock() = Some(new);
233        tracing::info!(target: "postcrate::http", addr = %addr, "http api restarted");
234
235        self.emit_status();
236        Ok(())
237    }
238
239    pub async fn stop_all(&self) -> Result<()> {
240        // Scope the lock so the MutexGuard is dropped before we await
241        // — otherwise this future is `!Send` and can't be spawned from
242        // a multi-thread runtime (e.g. Tauri's app shutdown hook).
243        let http = self.inner.http_handle.lock().take();
244        if let Some(http) = http {
245            http.shutdown.cancel();
246            let _ = http.task.await;
247        }
248        self.inner.mailboxes.stop_all().await;
249        *self.inner.started.lock() = false;
250        self.emit_status();
251        Ok(())
252    }
253
254    pub fn status(&self) -> ServerStatus {
255        ServerStatus {
256            running_mailboxes: self.inner.mailboxes.running_count(),
257            http_running: self.inner.http_handle.lock().is_some(),
258            errors: Vec::new(),
259        }
260    }
261
262    /// The HTTP API's bound socket address, if the server is running.
263    pub fn http_addr(&self) -> Option<std::net::SocketAddr> {
264        self.inner.http_handle.lock().as_ref().map(|h| h.addr)
265    }
266
267    /// The bound SMTP socket address for a given mailbox listener.
268    pub fn mailbox_addr(&self, mailbox_id: &str) -> Option<std::net::SocketAddr> {
269        self.inner.mailboxes.listener_addr(mailbox_id)
270    }
271
272    fn emit_status(&self) {
273        self.inner
274            .sink
275            .emit(CoreEvent::ServerStatusChanged { status: self.status() });
276    }
277
278    pub(crate) fn clone_handle(&self) -> ServiceHandle {
279        ServiceHandle {
280            inner: self.inner.clone(),
281        }
282    }
283
284    pub fn handle(&self) -> ServiceHandle {
285        self.clone_handle()
286    }
287
288    pub fn config(&self) -> &CoreConfig {
289        &self.inner.config
290    }
291
292    // ---- Mailboxes ----
293
294    pub async fn list_mailboxes(&self, project_id: Option<&str>) -> Result<Vec<Mailbox>> {
295        db_mb::list(&self.inner.pool, project_id).await
296    }
297
298    pub async fn get_mailbox(&self, id: &str) -> Result<Mailbox> {
299        let row = db_mb::get(&self.inner.pool, id).await?;
300        let count = db_mb::count_emails(&self.inner.pool, id).await?;
301        Ok(row.with_count(count))
302    }
303
304    pub async fn create_mailbox(&self, input: CreateMailboxInput) -> Result<Mailbox> {
305        let mb = self
306            .inner
307            .mailboxes
308            .create(
309                &input.project_id,
310                &input.name,
311                input.kind,
312                input.port,
313                input.ttl_seconds,
314                input.implicit_tls,
315            )
316            .await?;
317        self.audit("user", "mailbox.create", Some("mailbox"), Some(&mb.id), None)
318            .await;
319        Ok(mb)
320    }
321
322    pub async fn update_mailbox(
323        &self,
324        id: &str,
325        patch: UpdateMailboxInput,
326    ) -> Result<Mailbox> {
327        let mb = self.inner.mailboxes.update(id, &patch).await?;
328        self.audit("user", "mailbox.update", Some("mailbox"), Some(id), None)
329            .await;
330        Ok(mb)
331    }
332
333    pub async fn delete_mailbox(&self, id: &str) -> Result<()> {
334        self.inner.mailboxes.delete(id).await?;
335        self.audit("user", "mailbox.delete", Some("mailbox"), Some(id), None)
336            .await;
337        Ok(())
338    }
339
340    /// Suggest a free SMTP port for a new mailbox. Walks upward from
341    /// `start` (defaulting to 1025), skipping ports already in use by
342    /// another mailbox in this DB and probe-binding each candidate so
343    /// external collisions are caught too. Cheap (microseconds per
344    /// probe on loopback) so we don't bother caching engine-side.
345    ///
346    /// Advisory only: the actual `create_mailbox` is authoritative and
347    /// will return `PortInUse` if the suggestion was beaten by a
348    /// racing create or by a process that grabbed the port between
349    /// the suggestion and the bind.
350    pub async fn suggest_mailbox_port(&self, start: Option<u16>) -> Result<u16> {
351        use std::collections::HashSet;
352
353        let taken: HashSet<u16> = db_mb::list_all_ports(&self.inner.pool)
354            .await?
355            .into_iter()
356            .collect();
357        let host = self.inner.config.bind_host.as_ip();
358        let start = start.unwrap_or(1025);
359        crate::mailbox::ports::find_free_port(start, host, &taken).await
360    }
361
362    /// Bring a mailbox's SMTP listener online and clear the persistent
363    /// `paused` intent. Idempotent — calling it on a running mailbox
364    /// returns Ok. Bind failures propagate so the UI can revert its
365    /// optimistic update and show the actual reason.
366    pub async fn start_mailbox(&self, id: &str) -> Result<()> {
367        // Clear the user-intent flag first so a successful start sticks
368        // across a subsequent restart; if the start itself fails the
369        // intent still says "should be running" which matches what the
370        // user just asked for, and `failed=1` makes the pill red so
371        // they see the problem.
372        db_mb::set_paused(&self.inner.pool, id, false).await?;
373        self.inner.mailboxes.start(id).await?;
374        self.audit("user", "mailbox.start", Some("mailbox"), Some(id), None)
375            .await;
376        Ok(())
377    }
378
379    /// Tear down a mailbox's SMTP listener and remember the user
380    /// intent so the listener stays down across restarts. Idempotent.
381    pub async fn stop_mailbox(&self, id: &str) -> Result<()> {
382        db_mb::set_paused(&self.inner.pool, id, true).await?;
383        self.inner.mailboxes.stop(id).await?;
384        self.audit("user", "mailbox.stop", Some("mailbox"), Some(id), None)
385            .await;
386        Ok(())
387    }
388
389    pub async fn create_ephemeral(
390        &self,
391        input: CreateEphemeralInput,
392    ) -> Result<EphemeralHandle> {
393        let name = input.name.unwrap_or_else(|| format!("eph-{}", short_id()));
394        let mb = self
395            .inner
396            .mailboxes
397            .create(
398                &input.project_id,
399                &name,
400                MailboxKind::Ephemeral,
401                None,
402                Some(input.ttl_seconds),
403                false,
404            )
405            .await?;
406        let addr = self.inner.mailboxes.listener_addr(&mb.id);
407        let host = addr
408            .map(|a| a.ip().to_string())
409            .unwrap_or_else(|| self.inner.config.bind_host.as_ip().to_string());
410        let port = addr.map_or(mb.port, |a| a.port());
411        let expires_at = mb.expires_at.unwrap_or_else(|| {
412            Utc::now().timestamp_millis() + (input.ttl_seconds as i64 * 1000)
413        });
414        self.audit(
415            "user",
416            "mailbox.ephemeral.create",
417            Some("mailbox"),
418            Some(&mb.id),
419            None,
420        )
421        .await;
422        Ok(EphemeralHandle {
423            id: mb.id,
424            host,
425            port,
426            expires_at,
427        })
428    }
429
430    // ---- Emails ----
431
432    pub async fn list_emails(
433        &self,
434        mailbox_id: &str,
435        limit: u32,
436        offset: u32,
437    ) -> Result<Vec<EmailSummary>> {
438        db_emails::list(&self.inner.pool, mailbox_id, limit, offset).await
439    }
440
441    pub async fn get_email(&self, id: &str) -> Result<EmailDetail> {
442        db_emails::get_detail(&self.inner.pool, id).await
443    }
444
445    pub async fn get_email_raw(&self, id: &str) -> Result<Vec<u8>> {
446        let path = db_emails::get_raw_path(&self.inner.pool, id).await?;
447        Ok(tokio::fs::read(&path).await?)
448    }
449
450    /// Load the SMTP transcript captured at ingest time, if present.
451    /// Returns `Ok(None)` when the email exists but the transcript pref
452    /// was off when it was received (the common case for older mail).
453    pub async fn get_email_smtp_transcript(&self, id: &str) -> Result<Option<String>> {
454        let path = db_emails::get_raw_path(&self.inner.pool, id).await?;
455        let transcript_path =
456            crate::pipeline::ingest::transcript_path_for(std::path::Path::new(&path));
457        match tokio::fs::read_to_string(&transcript_path).await {
458            Ok(s) => Ok(Some(s)),
459            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
460            Err(e) => Err(e.into()),
461        }
462    }
463
464    pub async fn delete_email(&self, id: &str) -> Result<()> {
465        let raw_path = db_emails::delete(&self.inner.pool, id).await?;
466        delete_email_artifacts(&raw_path).await;
467        self.audit("user", "email.delete", Some("email"), Some(id), None).await;
468        Ok(())
469    }
470
471    /// Clear all non-pinned emails from a mailbox. Pinned emails (set
472    /// via [`Self::set_pinned`]) survive. Use
473    /// [`Self::purge_mailbox`] to wipe everything including pinned.
474    pub async fn clear_mailbox(&self, mailbox_id: &str) -> Result<u64> {
475        let (n, paths) = db_emails::clear_mailbox(&self.inner.pool, mailbox_id, true).await?;
476        for p in &paths {
477            delete_email_artifacts(p).await;
478        }
479        self.audit(
480            "user",
481            "mailbox.clear",
482            Some("mailbox"),
483            Some(mailbox_id),
484            Some(serde_json::json!({"deleted": n})),
485        )
486        .await;
487        Ok(n)
488    }
489
490    /// Wipe every email in a mailbox — pinned ones included. Use
491    /// only for explicit "purge" actions (rare).
492    pub async fn purge_mailbox(&self, mailbox_id: &str) -> Result<u64> {
493        let (n, paths) = db_emails::clear_mailbox(&self.inner.pool, mailbox_id, false).await?;
494        for p in &paths {
495            delete_email_artifacts(p).await;
496        }
497        self.audit(
498            "user",
499            "mailbox.purge",
500            Some("mailbox"),
501            Some(mailbox_id),
502            Some(serde_json::json!({"deleted": n})),
503        )
504        .await;
505        Ok(n)
506    }
507
508    pub async fn set_pinned(&self, id: &str, pinned: bool) -> Result<()> {
509        db_emails::set_pinned(&self.inner.pool, id, pinned).await?;
510        self.audit(
511            "user",
512            if pinned { "email.pin" } else { "email.unpin" },
513            Some("email"),
514            Some(id),
515            None,
516        )
517        .await;
518        Ok(())
519    }
520
521    pub async fn set_starred(&self, id: &str, starred: bool) -> Result<()> {
522        db_emails::set_starred(&self.inner.pool, id, starred).await?;
523        self.audit(
524            "user",
525            if starred { "email.star" } else { "email.unstar" },
526            Some("email"),
527            Some(id),
528            None,
529        )
530        .await;
531        Ok(())
532    }
533
534    pub async fn set_note(&self, id: &str, note: Option<&str>) -> Result<()> {
535        db_emails::set_note(&self.inner.pool, id, note).await?;
536        self.audit("user", "email.note", Some("email"), Some(id), None).await;
537        Ok(())
538    }
539
540    /// Set or clear the tag on an email. Plus-addressing
541    /// (`user+tag@host`) sets this automatically at ingest; this
542    /// method lets users override or clear it manually.
543    pub async fn set_tag(&self, id: &str, tag: Option<&str>) -> Result<()> {
544        db_emails::set_tag(&self.inner.pool, id, tag).await?;
545        self.audit("user", "email.tag", Some("email"), Some(id), None).await;
546        Ok(())
547    }
548
549    /// Forward a captured email to a real address via an external SMTP
550    /// relay. The original raw bytes are sent unchanged; the envelope
551    /// `MAIL FROM` defaults to the captured sender and the envelope
552    /// recipient is the new `to`.
553    ///
554    /// Audit-logged (PROD.md §9.3): this is the only public-Service
555    /// method that produces outbound network traffic, so users need a
556    /// clear trail of when releases happen.
557    pub async fn release_email(
558        &self,
559        id: &str,
560        to: &str,
561        relay: &crate::RelayConfig,
562    ) -> Result<()> {
563        let detail = self.get_email(id).await?;
564        let raw = self.get_email_raw(id).await?;
565        let from = if detail.from.is_empty() {
566            "postcrate@localhost".to_string()
567        } else {
568            detail.from.clone()
569        };
570        crate::smtp::relay::relay_message(relay, &from, &[to.to_string()], &raw).await?;
571        self.audit(
572            "user",
573            "email.release",
574            Some("email"),
575            Some(id),
576            Some(serde_json::json!({
577                "to": to,
578                "relay": format!("{}:{}", relay.host, relay.port),
579            })),
580        )
581        .await;
582        Ok(())
583    }
584
585    pub async fn search_emails(
586        &self,
587        q: &str,
588        mailbox_id: Option<&str>,
589        limit: u32,
590    ) -> Result<Vec<EmailSummary>> {
591        db_emails::search(&self.inner.pool, q, mailbox_id, limit).await
592    }
593
594    pub async fn mark_read(&self, id: &str, read: bool) -> Result<()> {
595        db_emails::mark_read(&self.inner.pool, id, read).await
596    }
597
598    // ---- Attachments ----
599
600    pub async fn get_attachment_blob(
601        &self,
602        attachment_id: &str,
603    ) -> Result<(Vec<u8>, Option<String>, Option<String>)> {
604        let (path, name, ct) =
605            crate::db::attachments::get_blob_path(&self.inner.pool, attachment_id).await?;
606        let bytes = tokio::fs::read(&path).await?;
607        Ok((bytes, name, ct))
608    }
609
610    // ---- Chaos ----
611
612    pub async fn get_chaos(&self, mailbox_id: &str) -> Result<ChaosConfig> {
613        // Surface NotFound for an unknown mailbox.
614        let _ = db_mb::get(&self.inner.pool, mailbox_id).await?;
615        chaos_configs::get(&self.inner.pool, mailbox_id).await
616    }
617
618    pub async fn set_chaos(&self, mailbox_id: &str, cfg: ChaosConfig) -> Result<()> {
619        let _ = db_mb::get(&self.inner.pool, mailbox_id).await?;
620        chaos_configs::upsert(&self.inner.pool, mailbox_id, &cfg).await?;
621        self.inner.mailboxes.refresh_chaos(mailbox_id).await?;
622        self.audit(
623            "user",
624            "chaos.update",
625            Some("mailbox"),
626            Some(mailbox_id),
627            Some(serde_json::to_value(&cfg)?),
628        )
629        .await;
630        Ok(())
631    }
632
633    // ---- Bounces ----
634
635    pub async fn list_bounce_rules(&self, mailbox_id: &str) -> Result<Vec<BounceRule>> {
636        let _ = db_mb::get(&self.inner.pool, mailbox_id).await?;
637        bounce_rules::list(&self.inner.pool, mailbox_id).await
638    }
639
640    pub async fn upsert_bounce_rule(&self, rule: BounceRule) -> Result<BounceRule> {
641        let _ = db_mb::get(&self.inner.pool, &rule.mailbox_id).await?;
642        let saved = bounce_rules::upsert(&self.inner.pool, rule).await?;
643        self.inner.mailboxes.refresh_bounce(&saved.mailbox_id).await?;
644        self.audit(
645            "user",
646            "bounce.upsert",
647            Some("mailbox"),
648            Some(&saved.mailbox_id),
649            Some(serde_json::to_value(&saved)?),
650        )
651        .await;
652        Ok(saved)
653    }
654
655    pub async fn delete_bounce_rule(&self, id: &str) -> Result<()> {
656        bounce_rules::delete(&self.inner.pool, id).await?;
657        self.audit("user", "bounce.delete", Some("bounce_rule"), Some(id), None).await;
658        Ok(())
659    }
660
661    // ---- Settings ----
662
663    pub async fn get_settings(&self) -> Result<BackendSettings> {
664        db_settings::load_all(&self.inner.pool).await
665    }
666
667    pub async fn update_settings(&self, patch: SettingsPatch) -> Result<()> {
668        let section = patch.section();
669
670        // For network changes, decide whether the running HTTP listener
671        // needs a rebind by comparing the incoming patch against the
672        // currently-persisted values. We snapshot *before* apply_patch
673        // so a partial DB write can't be misread as "nothing changed".
674        let needs_http_restart = if let SettingsPatch::Network(next) = &patch {
675            let prev = db_settings::load_all(&self.inner.pool).await?.network;
676            prev.http_api_port != next.http_api_port
677                || prev.api_auth_token != next.api_auth_token
678                || prev.expose_on_lan != next.expose_on_lan
679                || prev.api_tls != next.api_tls
680        } else {
681            false
682        };
683
684        // For Advanced changes, capture the deltas we react to live:
685        // debug logging (filter reload) and SMTP transcript capture
686        // (flag flip shared with running listeners). Same
687        // snapshot-before-write rationale as HTTP.
688        let mut new_debug_logging = None;
689        let mut new_preserve_transcript = None;
690        if let SettingsPatch::Advanced(next) = &patch {
691            let prev = db_settings::load_all(&self.inner.pool).await?.advanced;
692            if prev.debug_logging != next.debug_logging {
693                new_debug_logging = Some(next.debug_logging);
694            }
695            if prev.preserve_smtp_transcript != next.preserve_smtp_transcript {
696                new_preserve_transcript = Some(next.preserve_smtp_transcript);
697            }
698        }
699
700        db_settings::apply_patch(&self.inner.pool, &patch).await?;
701        self.inner.sink.emit(CoreEvent::SettingsChanged { section });
702
703        if needs_http_restart {
704            self.restart_http().await?;
705        }
706
707        if let Some(debug) = new_debug_logging {
708            let ctl = self.inner.log_controller.lock().clone();
709            if let Some(ctl) = ctl {
710                ctl(debug);
711            }
712        }
713
714        if let Some(enabled) = new_preserve_transcript {
715            self.inner.mailboxes.set_preserve_transcript(enabled);
716        }
717
718        Ok(())
719    }
720
721    // ---- Scenarios ----
722
723    /// Score a captured email's spam-likelihood.
724    /// Local heuristics only; no network.
725    pub async fn analyze_spam(
726        &self,
727        id: &str,
728    ) -> Result<crate::scenarios::spam::SpamReport> {
729        let parsed = self.parsed_email(id).await?;
730        Ok(crate::scenarios::spam::score(&parsed))
731    }
732
733    /// Extract + classify every link in a captured email
734    ///. Does not HEAD-check links.
735    pub async fn analyze_links(
736        &self,
737        id: &str,
738    ) -> Result<crate::scenarios::links::LinkReport> {
739        let parsed = self.parsed_email(id).await?;
740        Ok(crate::scenarios::links::extract(&parsed))
741    }
742
743    /// Inspect SPF / DKIM / DMARC headers and predict pass/fail
744    ///. Header inspection only.
745    pub async fn analyze_auth(
746        &self,
747        id: &str,
748    ) -> Result<crate::scenarios::auth::AuthReport> {
749        let parsed = self.parsed_email(id).await?;
750        Ok(crate::scenarios::auth::analyze(&parsed))
751    }
752
753    /// Validate the `List-Unsubscribe` / `List-Unsubscribe-Post`
754    /// headers per RFC 2369 + RFC 8058.
755    pub async fn analyze_list_unsub(
756        &self,
757        id: &str,
758    ) -> Result<crate::scenarios::list_unsub::UnsubReport> {
759        let parsed = self.parsed_email(id).await?;
760        Ok(crate::scenarios::list_unsub::analyze(&parsed))
761    }
762
763    /// Helper: re-parse a captured email's raw bytes from disk.
764    /// We don't cache the full `Parsed` in SQLite (only its JSON
765    /// projection), so scenarios that need attachments or full
766    /// headers re-parse on demand.
767    async fn parsed_email(&self, id: &str) -> Result<crate::mail::parse::Parsed> {
768        let raw = self.get_email_raw(id).await?;
769        Ok(crate::mail::parse::parse(&raw))
770    }
771
772    // ---- Rendering ----
773
774    /// Render the email's HTML body through a client profile
775    ///. Returns the transformed HTML + a list of
776    /// transforms that ran.
777    pub async fn render_preview(
778        &self,
779        id: &str,
780        profile: crate::rendering::profile::Profile,
781    ) -> Result<crate::rendering::profile::RenderedPreview> {
782        let detail = self.get_email(id).await?;
783        let html = detail.html_body.unwrap_or_default();
784        Ok(crate::rendering::profile::apply(&html, profile))
785    }
786
787    /// Lint the email's HTML for known client incompatibilities.
788    pub async fn lint_html(&self, id: &str) -> Result<crate::rendering::lint::LintReport> {
789        let detail = self.get_email(id).await?;
790        let html = detail.html_body.unwrap_or_default();
791        Ok(crate::rendering::lint::lint(&html))
792    }
793
794    /// Accessibility check on the email's HTML.
795    pub async fn audit_a11y(&self, id: &str) -> Result<crate::rendering::a11y::A11yReport> {
796        let detail = self.get_email(id).await?;
797        let html = detail.html_body.unwrap_or_default();
798        Ok(crate::rendering::a11y::audit(&html))
799    }
800
801    // ---- Recordings ----
802
803    /// Snapshot every email in a mailbox into a portable
804    /// `.postcrate` recording. The result serializes to
805    /// JSON via serde; the caller is responsible for persisting it.
806    pub async fn export_recording(
807        &self,
808        mailbox_id: &str,
809        label: Option<String>,
810    ) -> Result<crate::recording::Recording> {
811        // Existence check + 404 propagation.
812        let _ = db_mb::get(&self.inner.pool, mailbox_id).await?;
813        let summaries = db_emails::list(&self.inner.pool, mailbox_id, u32::MAX, 0).await?;
814        let mut messages = Vec::with_capacity(summaries.len());
815        // Walk in chronological order so replay observes the same
816        // received-at ordering as the original capture.
817        let mut summaries = summaries;
818        summaries.sort_by_key(|s| s.received_at);
819        for s in summaries {
820            let raw = self.get_email_raw(&s.id).await?;
821            let detail = self.get_email(&s.id).await?;
822            messages.push(crate::recording::RecordedMessage {
823                envelope: crate::recording::RecordedEnvelope {
824                    mail_from: detail.from.clone(),
825                    rcpt_to: detail.to.clone(),
826                    received_at: detail.received_at,
827                    ext_smtputf8: detail.ext_smtputf8,
828                    ext_8bitmime: detail.ext_8bitmime,
829                    subject: detail.subject.clone(),
830                },
831                raw_b64: crate::recording::encode_raw(&raw),
832            });
833        }
834        Ok(crate::recording::Recording {
835            version: crate::recording::RECORDING_VERSION,
836            exported_at: chrono::Utc::now().timestamp_millis(),
837            label,
838            messages,
839        })
840    }
841
842    /// Replay a recording's messages straight into a mailbox by
843    /// pushing them through the ingest worker. SMTP listeners,
844    /// chaos, and bounce rules are bypassed — this is for fixture
845    /// restoration, not for re-running a scenario.
846    /// Use [`Self::replay_email`] for a single SMTP-driven re-send.
847    pub async fn replay_recording(
848        &self,
849        mailbox_id: &str,
850        recording: &crate::recording::Recording,
851    ) -> Result<u64> {
852        recording.validate()?;
853        let _ = db_mb::get(&self.inner.pool, mailbox_id).await?;
854        let mailbox_id_owned = mailbox_id.to_string();
855        let ingest_tx = self.inner.mailboxes.ingest_tx();
856        let incoming_dir = self.inner.config.incoming_dir();
857        tokio::fs::create_dir_all(&incoming_dir).await?;
858
859        let mut count: u64 = 0;
860        for msg in &recording.messages {
861            let raw = crate::recording::decode_raw(msg)?;
862            // Spill the bytes to a temp file so the ingest worker
863            // picks up an OnDisk source (matches the real DATA path
864            // for messages > spill threshold; behavior is identical
865            // for smaller payloads).
866            let tmp = incoming_dir.join(format!("{}.tmp", uuid::Uuid::new_v4()));
867            tokio::fs::write(&tmp, &raw).await?;
868            let size = raw.len() as u64;
869            let env = crate::smtp::session::CapturedEnvelope {
870                mailbox_id: mailbox_id_owned.clone(),
871                received_at: msg.envelope.received_at,
872                mail_from: msg.envelope.mail_from.clone(),
873                rcpt_to: msg.envelope.rcpt_to.clone(),
874                raw: crate::smtp::data_reader::CapturedSource::OnDisk(tmp, size),
875                ext_smtputf8: msg.envelope.ext_smtputf8,
876                ext_8bitmime: msg.envelope.ext_8bitmime,
877                // Replays never carry a transcript — the original
878                // session's wire conversation isn't reproducible from
879                // the recording payload.
880                transcript: None,
881            };
882            ingest_tx
883                .send(env)
884                .await
885                .map_err(|e| crate::error::Error::Internal(format!("ingest closed: {e}")))?;
886            count += 1;
887        }
888        self.audit(
889            "user",
890            "recording.replay",
891            Some("mailbox"),
892            Some(mailbox_id),
893            Some(serde_json::json!({"count": count})),
894        )
895        .await;
896        Ok(count)
897    }
898
899    /// Re-inject one captured email's raw bytes into a (possibly
900    /// different) mailbox via the local SMTP listener — exercises
901    /// chaos + bounce rules + parsing the way a real send would.
902    pub async fn replay_email(&self, id: &str, target_mailbox_id: &str) -> Result<()> {
903        let detail = self.get_email(id).await?;
904        let raw = self.get_email_raw(id).await?;
905        let addr = self
906            .inner
907            .mailboxes
908            .listener_addr(target_mailbox_id)
909            .ok_or_else(|| crate::error::Error::MailboxNotFound(target_mailbox_id.into()))?;
910        let from = if detail.from.is_empty() {
911            "postcrate@localhost".to_string()
912        } else {
913            detail.from.clone()
914        };
915        let rcpts = if detail.to.is_empty() {
916            vec!["postcrate@localhost".to_string()]
917        } else {
918            detail.to.clone()
919        };
920        crate::smtp::relay::relay_message(
921            &crate::RelayConfig {
922                host: addr.ip().to_string(),
923                port: addr.port(),
924                timeout_seconds: Some(10),
925                allowed_recipients: None,
926            },
927            &from,
928            &rcpts,
929            &raw,
930        )
931        .await?;
932        self.audit(
933            "user",
934            "email.replay",
935            Some("email"),
936            Some(id),
937            Some(serde_json::json!({"targetMailbox": target_mailbox_id})),
938        )
939        .await;
940        Ok(())
941    }
942
943    // ---- Wait / Match ----
944
945    /// Block up to `timeout` for an email that satisfies `predicate`.
946    ///
947    /// Sequence:
948    ///   1. Subscribe to the event stream first (so we don't miss an
949    ///      email that arrives between scan + subscribe).
950    ///   2. Do a one-shot scan of recent emails in case it already
951    ///      arrived before the call.
952    ///   3. Otherwise consume the broadcast until timeout.
953    ///
954    /// The returned [`crate::matcher::WaitOutcome`] always carries the
955    /// list of emails seen during the wait, so callers can distinguish
956    /// "no email at all" from "email arrived but didn't match".
957    pub async fn wait_for_email(
958        &self,
959        predicate: crate::matcher::EmailPredicate,
960        timeout: std::time::Duration,
961    ) -> Result<crate::matcher::WaitOutcome> {
962        use crate::events::CoreEvent;
963        use tokio::sync::broadcast::error::RecvError;
964        use tokio::time::Instant;
965
966        let mut rx = self.subscribe();
967        let mut seen: Vec<EmailSummary> = Vec::new();
968
969        // Initial scan — most recent 100 emails in scope.
970        let initial = self.scan_for_match(&predicate, 100).await?;
971        if let Some(d) = initial.matched {
972            return Ok(crate::matcher::WaitOutcome {
973                matched: Some(d),
974                seen_during_wait: initial.seen,
975            });
976        }
977        seen.extend(initial.seen);
978
979        let deadline = Instant::now() + timeout;
980        loop {
981            let remaining = deadline.saturating_duration_since(Instant::now());
982            if remaining.is_zero() {
983                return Ok(crate::matcher::WaitOutcome {
984                    matched: None,
985                    seen_during_wait: seen,
986                });
987            }
988            match tokio::time::timeout(remaining, rx.recv()).await {
989                Err(_) => {
990                    return Ok(crate::matcher::WaitOutcome {
991                        matched: None,
992                        seen_during_wait: seen,
993                    });
994                }
995                Ok(Err(RecvError::Closed)) => {
996                    return Ok(crate::matcher::WaitOutcome {
997                        matched: None,
998                        seen_during_wait: seen,
999                    });
1000                }
1001                Ok(Err(RecvError::Lagged(_))) => {
1002                    // Catch up via a full scan and keep looping.
1003                    let catch = self.scan_for_match(&predicate, 100).await?;
1004                    if catch.matched.is_some() {
1005                        return Ok(crate::matcher::WaitOutcome {
1006                            matched: catch.matched,
1007                            seen_during_wait: seen,
1008                        });
1009                    }
1010                    continue;
1011                }
1012                Ok(Ok(CoreEvent::NewEmail { mailbox_id, email })) => {
1013                    if predicate.mailbox_id.as_ref().is_some_and(|m| m != &mailbox_id) {
1014                        continue;
1015                    }
1016                    if predicate.matches_summary(&email) {
1017                        let detail = self.get_email(&email.id).await?;
1018                        if predicate.check(&detail).matched {
1019                            return Ok(crate::matcher::WaitOutcome {
1020                                matched: Some(detail),
1021                                seen_during_wait: seen,
1022                            });
1023                        }
1024                    }
1025                    seen.push(email);
1026                }
1027                Ok(Ok(_)) => continue,
1028            }
1029        }
1030    }
1031
1032    /// Check a specific email against a predicate. The full
1033    /// [`crate::matcher::MatchResult`] is returned (including any
1034    /// mismatches) so callers can produce a structured diff.
1035    pub async fn assert_email_matches(
1036        &self,
1037        id: &str,
1038        predicate: &crate::matcher::EmailPredicate,
1039    ) -> Result<crate::matcher::MatchResult> {
1040        let detail = self.get_email(id).await?;
1041        Ok(predicate.check(&detail))
1042    }
1043
1044    /// Implementation detail of [`Self::wait_for_email`]: scan up to
1045    /// `limit` most-recent emails (across all mailboxes, or filtered
1046    /// by `predicate.mailbox_id`) and return either the first match
1047    /// or the list of all candidates seen.
1048    async fn scan_for_match(
1049        &self,
1050        predicate: &crate::matcher::EmailPredicate,
1051        limit: u32,
1052    ) -> Result<ScanResult> {
1053        let summaries = match &predicate.mailbox_id {
1054            Some(mb) => db_emails::list(&self.inner.pool, mb, limit, 0).await?,
1055            None => db_emails::list_recent_across(&self.inner.pool, limit).await?,
1056        };
1057        let mut seen = Vec::new();
1058        for s in summaries {
1059            if !predicate.matches_summary(&s) {
1060                seen.push(s);
1061                continue;
1062            }
1063            let detail = self.get_email(&s.id).await?;
1064            if predicate.check(&detail).matched {
1065                return Ok(ScanResult { matched: Some(detail), seen });
1066            }
1067            seen.push(s);
1068        }
1069        Ok(ScanResult { matched: None, seen })
1070    }
1071
1072    // ---- Webhooks ----
1073
1074    pub async fn list_webhooks(&self) -> Result<Vec<crate::db::webhooks::Webhook>> {
1075        crate::db::webhooks::list(&self.inner.pool).await
1076    }
1077
1078    pub async fn create_webhook(
1079        &self,
1080        input: crate::db::webhooks::CreateWebhook,
1081    ) -> Result<crate::db::webhooks::Webhook> {
1082        let hook = crate::db::webhooks::insert(&self.inner.pool, input).await?;
1083        self.audit(
1084            "user",
1085            "webhook.create",
1086            Some("webhook"),
1087            Some(&hook.id),
1088            None,
1089        )
1090        .await;
1091        Ok(hook)
1092    }
1093
1094    pub async fn delete_webhook(&self, id: &str) -> Result<()> {
1095        crate::db::webhooks::delete(&self.inner.pool, id).await?;
1096        self.audit("user", "webhook.delete", Some("webhook"), Some(id), None).await;
1097        Ok(())
1098    }
1099
1100    // ---- Forwarding ----
1101
1102    pub async fn list_forwarding_rules(
1103        &self,
1104    ) -> Result<Vec<crate::db::forwarding::ForwardingRule>> {
1105        crate::db::forwarding::list(&self.inner.pool).await
1106    }
1107
1108    pub async fn create_forwarding_rule(
1109        &self,
1110        input: crate::db::forwarding::CreateForwardingRule,
1111    ) -> Result<crate::db::forwarding::ForwardingRule> {
1112        let rule = crate::db::forwarding::insert(&self.inner.pool, input).await?;
1113        self.audit(
1114            "user",
1115            "forwarding.create",
1116            Some("forwarding_rule"),
1117            Some(&rule.id),
1118            None,
1119        )
1120        .await;
1121        Ok(rule)
1122    }
1123
1124    pub async fn delete_forwarding_rule(&self, id: &str) -> Result<()> {
1125        crate::db::forwarding::delete(&self.inner.pool, id).await?;
1126        self.audit(
1127            "user",
1128            "forwarding.delete",
1129            Some("forwarding_rule"),
1130            Some(id),
1131            None,
1132        )
1133        .await;
1134        Ok(())
1135    }
1136
1137    // ---- Audit ----
1138
1139    pub async fn list_audit(&self, limit: u32, offset: u32) -> Result<Vec<AuditEntry>> {
1140        db_audit::list(&self.inner.pool, limit, offset).await
1141    }
1142
1143    pub async fn clear_audit(&self, older_than_days: Option<u32>) -> Result<u64> {
1144        match older_than_days {
1145            Some(days) => db_audit::prune_older_than(&self.inner.pool, days).await,
1146            None => db_audit::clear_all(&self.inner.pool).await,
1147        }
1148    }
1149
1150    // ---- internal ----
1151
1152    async fn audit(
1153        &self,
1154        actor: &str,
1155        action: &str,
1156        target_kind: Option<&str>,
1157        target_id: Option<&str>,
1158        metadata: Option<serde_json::Value>,
1159    ) {
1160        let res = db_audit::append(
1161            &self.inner.pool,
1162            AuditAppend {
1163                actor: actor.to_string(),
1164                action: action.to_string(),
1165                target_kind: target_kind.map(str::to_string),
1166                target_id: target_id.map(str::to_string),
1167                metadata,
1168            },
1169        )
1170        .await;
1171        if let Ok(entry) = res {
1172            self.inner
1173                .sink
1174                .emit(CoreEvent::AuditAppended { entry });
1175        }
1176    }
1177}
1178
1179/// Cheap-to-clone view into a [`Service`]. The HTTP layer uses this.
1180#[derive(Clone)]
1181pub struct ServiceHandle {
1182    pub(crate) inner: Arc<Inner>,
1183}
1184
1185impl ServiceHandle {
1186    pub fn pool(&self) -> &SqlitePool {
1187        &self.inner.pool
1188    }
1189
1190    pub fn mailboxes(&self) -> &MailboxService {
1191        &self.inner.mailboxes
1192    }
1193
1194    pub fn config(&self) -> &CoreConfig {
1195        &self.inner.config
1196    }
1197
1198    pub fn sink(&self) -> &Arc<dyn EventSink> {
1199        &self.inner.sink
1200    }
1201
1202    pub fn as_service(&self) -> Service {
1203        Service {
1204            inner: self.inner.clone(),
1205        }
1206    }
1207}
1208
1209impl std::fmt::Debug for ServiceHandle {
1210    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1211        f.debug_struct("ServiceHandle").finish()
1212    }
1213}
1214
1215fn short_id() -> String {
1216    use rand::distributions::{Alphanumeric, DistString};
1217    Alphanumeric.sample_string(&mut rand::thread_rng(), 6).to_lowercase()
1218}
1219
1220/// Best-effort cleanup for an email's on-disk artifacts: the raw blob
1221/// and, when present, the SMTP transcript sidecar. Used by
1222/// `delete_email`, `clear_mailbox`, and `purge_mailbox` so no path
1223/// ever forgets to drop the transcript alongside the email it belongs
1224/// to. Retention has its own copy of this helper to avoid pulling
1225/// `service.rs` into `pipeline/`.
1226async fn delete_email_artifacts(raw_path: &str) {
1227    let _ = tokio::fs::remove_file(raw_path).await;
1228    let _ = tokio::fs::remove_file(crate::pipeline::ingest::transcript_path_for(
1229        std::path::Path::new(raw_path),
1230    ))
1231    .await;
1232}