1use chrono::Utc;
8use serde::{Deserialize, Serialize};
9use sqlx::{Row, SqlitePool};
10use uuid::Uuid;
11
12use crate::error::{Error, Result};
13use crate::mailbox::kinds::MailboxKind;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17#[cfg_attr(feature = "specta", derive(specta::Type))]
18#[serde(rename_all = "camelCase")]
19pub struct Mailbox {
20 pub id: String,
21 pub project_id: String,
22 pub name: String,
23 pub port: u16,
24 pub kind: MailboxKind,
25 pub ttl_seconds: Option<u64>,
26 pub expires_at: Option<i64>,
27 pub failed: bool,
28 pub fail_reason: Option<String>,
29 pub created_at: i64,
30 pub count: i64,
31 pub implicit_tls: bool,
35 pub paused: bool,
38}
39
40#[derive(Debug, Clone, Deserialize)]
41#[cfg_attr(feature = "specta", derive(specta::Type))]
42#[serde(rename_all = "camelCase")]
43pub struct CreateMailboxInput {
44 pub project_id: String,
45 pub name: String,
46 pub kind: MailboxKind,
47 pub port: Option<u16>,
48 pub ttl_seconds: Option<u64>,
49 #[serde(default)]
50 pub implicit_tls: bool,
51}
52
53#[derive(Debug, Clone, Default, Deserialize)]
54#[cfg_attr(feature = "specta", derive(specta::Type))]
55#[serde(rename_all = "camelCase")]
56pub struct UpdateMailboxInput {
57 pub name: Option<String>,
58 pub port: Option<u16>,
59 pub ttl_seconds: Option<Option<u64>>, }
61
62#[derive(Debug, Clone, Deserialize)]
63#[cfg_attr(feature = "specta", derive(specta::Type))]
64#[serde(rename_all = "camelCase")]
65pub struct CreateEphemeralInput {
66 pub project_id: String,
67 pub name: Option<String>,
68 pub ttl_seconds: u64,
69}
70
71#[derive(Debug, Clone, Serialize)]
72#[cfg_attr(feature = "specta", derive(specta::Type))]
73#[serde(rename_all = "camelCase")]
74pub struct EphemeralHandle {
75 pub id: String,
76 pub host: String,
77 pub port: u16,
78 pub expires_at: i64,
79}
80
81#[derive(Debug, Clone)]
82pub(crate) struct MailboxRow {
83 pub id: String,
84 pub project_id: String,
85 pub name: String,
86 pub port: u16,
87 pub kind: MailboxKind,
88 pub ttl_seconds: Option<u64>,
89 pub expires_at: Option<i64>,
90 pub failed: bool,
91 pub fail_reason: Option<String>,
92 pub created_at: i64,
93 pub implicit_tls: bool,
94 pub paused: bool,
95}
96
97impl MailboxRow {
98 pub(crate) fn with_count(self, count: i64) -> Mailbox {
99 Mailbox {
100 id: self.id,
101 project_id: self.project_id,
102 name: self.name,
103 port: self.port,
104 kind: self.kind,
105 ttl_seconds: self.ttl_seconds,
106 expires_at: self.expires_at,
107 failed: self.failed,
108 fail_reason: self.fail_reason,
109 created_at: self.created_at,
110 implicit_tls: self.implicit_tls,
111 paused: self.paused,
112 count,
113 }
114 }
115}
116
117pub(crate) async fn insert(
118 pool: &SqlitePool,
119 project_id: &str,
120 name: &str,
121 port: u16,
122 kind: MailboxKind,
123 ttl_seconds: Option<u64>,
124 implicit_tls: bool,
125) -> Result<MailboxRow> {
126 let now = Utc::now().timestamp_millis();
127 let expires_at = ttl_seconds.map(|t| now + (t as i64) * 1000);
128 let id = Uuid::new_v4().to_string();
129
130 let res = sqlx::query(
131 r"INSERT INTO mailboxes
132 (id, project_id, name, port, kind, ttl_seconds, expires_at, created_at, implicit_tls)
133 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
134 )
135 .bind(&id)
136 .bind(project_id)
137 .bind(name)
138 .bind(i64::from(port))
139 .bind(kind.as_str())
140 .bind(ttl_seconds.map(|t| t as i64))
141 .bind(expires_at)
142 .bind(now)
143 .bind(i64::from(implicit_tls))
144 .execute(pool)
145 .await;
146
147 match res {
148 Ok(_) => Ok(MailboxRow {
149 id,
150 project_id: project_id.to_string(),
151 name: name.to_string(),
152 port,
153 kind,
154 ttl_seconds,
155 expires_at,
156 failed: false,
157 fail_reason: None,
158 created_at: now,
159 implicit_tls,
160 paused: false,
161 }),
162 Err(sqlx::Error::Database(e)) if e.is_unique_violation() => {
163 let msg = e.message().to_lowercase();
165 if msg.contains("mailboxes.port") {
166 Err(Error::PortInUse(port))
167 } else {
168 Err(Error::DuplicateMailbox(name.to_string()))
169 }
170 }
171 Err(e) => Err(e.into()),
172 }
173}
174
175pub(crate) async fn get(pool: &SqlitePool, id: &str) -> Result<MailboxRow> {
176 let row = sqlx::query(
177 r"SELECT id, project_id, name, port, kind, ttl_seconds, expires_at,
178 failed, fail_reason, created_at, implicit_tls, paused
179 FROM mailboxes WHERE id = ?",
180 )
181 .bind(id)
182 .fetch_optional(pool)
183 .await?
184 .ok_or_else(|| Error::MailboxNotFound(id.to_string()))?;
185
186 Ok(row_to_mailbox_row(&row))
187}
188
189pub(crate) async fn list(pool: &SqlitePool, project_id: Option<&str>) -> Result<Vec<Mailbox>> {
190 let sql = if project_id.is_some() {
191 r"SELECT m.id, m.project_id, m.name, m.port, m.kind, m.ttl_seconds, m.expires_at,
192 m.failed, m.fail_reason, m.created_at, m.implicit_tls, m.paused,
193 COALESCE(c.cnt, 0) AS cnt
194 FROM mailboxes m
195 LEFT JOIN (SELECT mailbox_id, COUNT(*) AS cnt FROM emails GROUP BY mailbox_id) c
196 ON c.mailbox_id = m.id
197 WHERE m.project_id = ?
198 ORDER BY m.created_at ASC"
199 } else {
200 r"SELECT m.id, m.project_id, m.name, m.port, m.kind, m.ttl_seconds, m.expires_at,
201 m.failed, m.fail_reason, m.created_at, m.implicit_tls, m.paused,
202 COALESCE(c.cnt, 0) AS cnt
203 FROM mailboxes m
204 LEFT JOIN (SELECT mailbox_id, COUNT(*) AS cnt FROM emails GROUP BY mailbox_id) c
205 ON c.mailbox_id = m.id
206 ORDER BY m.created_at ASC"
207 };
208
209 let mut q = sqlx::query(sql);
210 if let Some(p) = project_id {
211 q = q.bind(p);
212 }
213 let rows = q.fetch_all(pool).await?;
214
215 Ok(rows
216 .into_iter()
217 .map(|row| {
218 let mb = row_to_mailbox_row(&row);
219 let count: i64 = row.try_get("cnt").unwrap_or(0);
220 mb.with_count(count)
221 })
222 .collect())
223}
224
225pub(crate) async fn count_emails(pool: &SqlitePool, mailbox_id: &str) -> Result<i64> {
226 let row = sqlx::query("SELECT COUNT(*) AS c FROM emails WHERE mailbox_id = ?")
227 .bind(mailbox_id)
228 .fetch_one(pool)
229 .await?;
230 Ok(row.try_get::<i64, _>("c").unwrap_or(0))
231}
232
233pub(crate) async fn update(
234 pool: &SqlitePool,
235 id: &str,
236 patch: &UpdateMailboxInput,
237) -> Result<MailboxRow> {
238 let current = get(pool, id).await?;
239
240 let new_name = patch.name.clone().unwrap_or(current.name.clone());
241 let new_port = patch.port.unwrap_or(current.port);
242 let new_ttl = match patch.ttl_seconds {
243 None => current.ttl_seconds,
244 Some(v) => v,
245 };
246
247 let res = sqlx::query(
248 r"UPDATE mailboxes
249 SET name = ?, port = ?, ttl_seconds = ?
250 WHERE id = ?",
251 )
252 .bind(&new_name)
253 .bind(i64::from(new_port))
254 .bind(new_ttl.map(|t| t as i64))
255 .bind(id)
256 .execute(pool)
257 .await;
258
259 match res {
260 Ok(_) => get(pool, id).await,
261 Err(sqlx::Error::Database(e)) if e.is_unique_violation() => {
262 let msg = e.message().to_lowercase();
263 if msg.contains("mailboxes.port") {
264 Err(Error::PortInUse(new_port))
265 } else {
266 Err(Error::DuplicateMailbox(new_name))
267 }
268 }
269 Err(e) => Err(e.into()),
270 }
271}
272
273pub(crate) async fn delete(pool: &SqlitePool, id: &str) -> Result<()> {
274 let res = sqlx::query("DELETE FROM mailboxes WHERE id = ?")
275 .bind(id)
276 .execute(pool)
277 .await?;
278 if res.rows_affected() == 0 {
279 return Err(Error::MailboxNotFound(id.to_string()));
280 }
281 Ok(())
282}
283
284pub(crate) async fn mark_failed(
285 pool: &SqlitePool,
286 id: &str,
287 reason: Option<&str>,
288) -> Result<()> {
289 sqlx::query("UPDATE mailboxes SET failed = 1, fail_reason = ? WHERE id = ?")
290 .bind(reason)
291 .bind(id)
292 .execute(pool)
293 .await?;
294 Ok(())
295}
296
297pub(crate) async fn clear_failed(pool: &SqlitePool, id: &str) -> Result<()> {
298 sqlx::query("UPDATE mailboxes SET failed = 0, fail_reason = NULL WHERE id = ?")
299 .bind(id)
300 .execute(pool)
301 .await?;
302 Ok(())
303}
304
305pub(crate) async fn set_paused(pool: &SqlitePool, id: &str, paused: bool) -> Result<()> {
309 sqlx::query("UPDATE mailboxes SET paused = ? WHERE id = ?")
310 .bind(i64::from(paused))
311 .bind(id)
312 .execute(pool)
313 .await?;
314 Ok(())
315}
316
317pub(crate) async fn sweep_expired_ephemerals(pool: &SqlitePool) -> Result<Vec<String>> {
320 let now = Utc::now().timestamp_millis();
321 let rows = sqlx::query(
322 r"SELECT id FROM mailboxes
323 WHERE kind = 'ephemeral'
324 AND (expires_at IS NULL OR expires_at < ?)",
325 )
326 .bind(now)
327 .fetch_all(pool)
328 .await?;
329
330 let ids: Vec<String> = rows
331 .iter()
332 .filter_map(|r| r.try_get::<String, _>("id").ok())
333 .collect();
334
335 if !ids.is_empty() {
336 sqlx::query(
337 r"DELETE FROM mailboxes
338 WHERE kind = 'ephemeral'
339 AND (expires_at IS NULL OR expires_at < ?)",
340 )
341 .bind(now)
342 .execute(pool)
343 .await?;
344 }
345 Ok(ids)
346}
347
348pub(crate) async fn list_all_ports(pool: &SqlitePool) -> Result<Vec<u16>> {
349 let rows = sqlx::query("SELECT port FROM mailboxes").fetch_all(pool).await?;
350 Ok(rows
351 .into_iter()
352 .filter_map(|r| r.try_get::<i64, _>("port").ok())
353 .map(|n| n as u16)
354 .collect())
355}
356
357pub(crate) async fn list_active_for_boot(pool: &SqlitePool) -> Result<Vec<MailboxRow>> {
358 let rows = sqlx::query(
359 r"SELECT id, project_id, name, port, kind, ttl_seconds, expires_at,
360 failed, fail_reason, created_at, implicit_tls, paused
361 FROM mailboxes",
362 )
363 .fetch_all(pool)
364 .await?;
365 Ok(rows.iter().map(row_to_mailbox_row).collect())
366}
367
368pub(crate) async fn list_expiring(pool: &SqlitePool) -> Result<Vec<(String, i64)>> {
369 let rows = sqlx::query(
370 r"SELECT id, expires_at FROM mailboxes
371 WHERE kind = 'ephemeral' AND expires_at IS NOT NULL",
372 )
373 .fetch_all(pool)
374 .await?;
375 Ok(rows
376 .into_iter()
377 .filter_map(|r| {
378 let id: String = r.try_get("id").ok()?;
379 let exp: i64 = r.try_get("expires_at").ok()?;
380 Some((id, exp))
381 })
382 .collect())
383}
384
385fn row_to_mailbox_row(row: &sqlx::sqlite::SqliteRow) -> MailboxRow {
386 let kind_str: String = row.try_get("kind").unwrap_or_else(|_| "primary".into());
387 let kind = MailboxKind::from_str(&kind_str).unwrap_or(MailboxKind::Primary);
388 let ttl_seconds: Option<i64> = row.try_get("ttl_seconds").ok();
389 let port_i64: i64 = row.try_get("port").unwrap_or(0);
390 let failed_i: i64 = row.try_get("failed").unwrap_or(0);
391
392 MailboxRow {
393 id: row.try_get("id").unwrap_or_default(),
394 project_id: row.try_get("project_id").unwrap_or_default(),
395 name: row.try_get("name").unwrap_or_default(),
396 port: port_i64 as u16,
397 kind,
398 ttl_seconds: ttl_seconds.map(|t| t as u64),
399 expires_at: row.try_get("expires_at").ok(),
400 failed: failed_i != 0,
401 fail_reason: row.try_get("fail_reason").ok(),
402 created_at: row.try_get("created_at").unwrap_or(0),
403 implicit_tls: row.try_get::<i64, _>("implicit_tls").unwrap_or(0) != 0,
404 paused: row.try_get::<i64, _>("paused").unwrap_or(0) != 0,
405 }
406}