1use chrono::{DateTime, Utc};
7use sea_orm::{ConnectionTrait, DatabaseBackend, DatabaseConnection, Statement, Value};
8use serde::{Deserialize, Serialize};
9
10use crate::error::Error;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct Deployment {
19 pub id: i64,
21 pub identifier: String,
23 pub owner_key: String,
25 pub source_ref: Option<String>,
27 pub artifact_location: Option<String>,
29 pub byte_size: Option<i64>,
31 pub status: DeploymentStatus,
33 pub artifact_deleted_at: Option<DateTime<Utc>>,
36 pub terminated_at: Option<DateTime<Utc>>,
38 pub created_at: DateTime<Utc>,
40}
41
42#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub enum DeploymentStatus {
50 Building,
52 Ready,
54 Failed,
56}
57
58impl DeploymentStatus {
59 fn from_str(s: &str) -> Result<Self, Error> {
60 match s {
61 "building" => Ok(Self::Building),
62 "ready" => Ok(Self::Ready),
63 "failed" => Ok(Self::Failed),
64 other => Err(Error::custom(format!("unknown deployment status: {other}"))),
65 }
66 }
67}
68
69#[derive(Clone)]
78pub struct Deployments {
79 db: DatabaseConnection,
80}
81
82impl Deployments {
83 pub fn new(db: DatabaseConnection) -> Self {
85 Self { db }
86 }
87
88 pub async fn create(
92 &self,
93 owner_key: &str,
94 source_ref: Option<&str>,
95 ) -> Result<Deployment, Error> {
96 let identifier = uuid::Uuid::new_v4().to_string();
97 let now = Utc::now();
98 let now_iso = now.to_rfc3339();
99 let backend = self.db.get_database_backend();
100
101 let (p1, p2, p3, p4) = (
102 ph(backend, 1)?,
103 ph(backend, 2)?,
104 ph(backend, 3)?,
105 ph(backend, 4)?,
106 );
107 let sql = format!(
108 "INSERT INTO deployments \
109 (identifier, owner_key, source_ref, artifact_location, byte_size, status, \
110 artifact_deleted_at, terminated_at, created_at) \
111 VALUES ({p1}, {p2}, {p3}, NULL, NULL, 'building', NULL, NULL, {p4}) \
112 RETURNING id"
113 );
114 let stmt = Statement::from_sql_and_values(
115 backend,
116 &sql,
117 [
118 Value::String(Some(Box::new(identifier.clone()))),
119 Value::String(Some(Box::new(owner_key.to_string()))),
120 source_ref.map_or(Value::String(None), |s| {
121 Value::String(Some(Box::new(s.to_string())))
122 }),
123 Value::String(Some(Box::new(now_iso.clone()))),
124 ],
125 );
126
127 let row = self
128 .db
129 .query_one(stmt)
130 .await
131 .map_err(Error::Db)?
132 .ok_or_else(|| Error::custom("create: INSERT RETURNING returned no row"))?;
133
134 let id: i64 = row
135 .try_get_by::<i64, _>("id")
136 .map_err(|e| Error::custom(format!("create: parse id: {e}")))?;
137
138 Ok(Deployment {
139 id,
140 identifier,
141 owner_key: owner_key.to_string(),
142 source_ref: source_ref.map(str::to_string),
143 artifact_location: None,
144 byte_size: None,
145 status: DeploymentStatus::Building,
146 artifact_deleted_at: None,
147 terminated_at: None,
148 created_at: now,
149 })
150 }
151
152 pub async fn mark_ready(
158 &self,
159 id: i64,
160 artifact_location: &str,
161 byte_size: i64,
162 ) -> Result<(), Error> {
163 let now_iso = Utc::now().to_rfc3339();
164 let backend = self.db.get_database_backend();
165 let (p1, p2, p3, p4) = (
166 ph(backend, 1)?,
167 ph(backend, 2)?,
168 ph(backend, 3)?,
169 ph(backend, 4)?,
170 );
171 let sql = format!(
172 "UPDATE deployments \
173 SET status = 'ready', artifact_location = {p1}, byte_size = {p2}, terminated_at = {p3} \
174 WHERE id = {p4} AND status = 'building'"
175 );
176 let stmt = Statement::from_sql_and_values(
177 backend,
178 &sql,
179 [
180 Value::String(Some(Box::new(artifact_location.to_string()))),
181 Value::BigInt(Some(byte_size)),
182 Value::String(Some(Box::new(now_iso))),
183 Value::BigInt(Some(id)),
184 ],
185 );
186 let result = self.db.execute(stmt).await.map_err(Error::Db)?;
187 if result.rows_affected() == 0 {
188 self.get(id).await?;
192 return Err(Error::custom(format!(
193 "deployment {id} is not in building state; transition to ready rejected"
194 )));
195 }
196 Ok(())
197 }
198
199 pub async fn mark_failed(&self, id: i64, error: &str) -> Result<(), Error> {
205 tracing::warn!(
206 deployment_id = id,
207 error = error,
208 "deployment marked failed"
209 );
210 let now_iso = Utc::now().to_rfc3339();
211 let backend = self.db.get_database_backend();
212 let (p1, p2) = (ph(backend, 1)?, ph(backend, 2)?);
213 let sql = format!(
214 "UPDATE deployments \
215 SET status = 'failed', terminated_at = {p1} \
216 WHERE id = {p2} AND status = 'building'"
217 );
218 let stmt = Statement::from_sql_and_values(
219 backend,
220 &sql,
221 [
222 Value::String(Some(Box::new(now_iso))),
223 Value::BigInt(Some(id)),
224 ],
225 );
226 let result = self.db.execute(stmt).await.map_err(Error::Db)?;
227 if result.rows_affected() == 0 {
228 self.get(id).await?;
229 return Err(Error::custom(format!(
230 "deployment {id} is not in building state; transition to failed rejected"
231 )));
232 }
233 Ok(())
234 }
235
236 pub async fn get(&self, id: i64) -> Result<Deployment, Error> {
238 let backend = self.db.get_database_backend();
239 let p1 = ph(backend, 1)?;
240 let sql = format!(
241 "SELECT id, identifier, owner_key, source_ref, artifact_location, byte_size, \
242 status, artifact_deleted_at, terminated_at, created_at \
243 FROM deployments WHERE id = {p1}"
244 );
245 let stmt = Statement::from_sql_and_values(backend, &sql, [Value::BigInt(Some(id))]);
246 let row = self
247 .db
248 .query_one(stmt)
249 .await
250 .map_err(Error::Db)?
251 .ok_or(Error::NotFound { id })?;
252 parse_deployment_row(&row)
253 }
254
255 pub async fn list(&self, owner_key: &str) -> Result<Vec<Deployment>, Error> {
257 let backend = self.db.get_database_backend();
258 let p1 = ph(backend, 1)?;
259 let sql = format!(
260 "SELECT id, identifier, owner_key, source_ref, artifact_location, byte_size, \
261 status, artifact_deleted_at, terminated_at, created_at \
262 FROM deployments WHERE owner_key = {p1} ORDER BY id DESC"
263 );
264 let stmt = Statement::from_sql_and_values(
265 backend,
266 &sql,
267 [Value::String(Some(Box::new(owner_key.to_string())))],
268 );
269 let rows = self.db.query_all(stmt).await.map_err(Error::Db)?;
270 rows.iter().map(parse_deployment_row).collect()
271 }
272
273 pub async fn active(&self, owner_key: &str) -> Result<Option<Deployment>, Error> {
276 let backend = self.db.get_database_backend();
277 let p1 = ph(backend, 1)?;
278 let sql = format!("SELECT deployment_id FROM deployment_pointers WHERE owner_key = {p1}");
279 let stmt = Statement::from_sql_and_values(
280 backend,
281 &sql,
282 [Value::String(Some(Box::new(owner_key.to_string())))],
283 );
284 let row = self.db.query_one(stmt).await.map_err(Error::Db)?;
285 match row {
286 None => Ok(None),
287 Some(r) => {
288 let deployment_id: i64 = r
289 .try_get_by::<i64, _>("deployment_id")
290 .map_err(|e| Error::custom(format!("active: parse deployment_id: {e}")))?;
291 self.get(deployment_id).await.map(Some)
292 }
293 }
294 }
295
296 pub async fn promote(&self, owner_key: &str, deployment_id: i64) -> Result<Option<i64>, Error> {
306 let dep = self.get(deployment_id).await?;
307 if dep.status != DeploymentStatus::Ready {
308 return Err(Error::NotReady { id: deployment_id });
309 }
310 if dep.artifact_deleted_at.is_some() {
311 return Err(Error::ArtifactDeleted { id: deployment_id });
312 }
313 crate::promote::promote(&self.db, owner_key, deployment_id).await
314 }
315
316 pub async fn rollback(&self, owner_key: &str) -> Result<Option<i64>, Error> {
321 let backend = self.db.get_database_backend();
322 let p1 = ph(backend, 1)?;
323 let sql = format!(
324 "SELECT previous_deployment_id FROM deployment_pointers WHERE owner_key = {p1}"
325 );
326 let stmt = Statement::from_sql_and_values(
327 backend,
328 &sql,
329 [Value::String(Some(Box::new(owner_key.to_string())))],
330 );
331 let row = self.db.query_one(stmt).await.map_err(Error::Db)?;
332 let previous_id = match row {
333 None => {
334 return Err(Error::NoPreviousDeployment {
335 owner_key: owner_key.to_string(),
336 })
337 }
338 Some(r) => {
339 let opt: Option<i64> = r
340 .try_get_by::<Option<i64>, _>("previous_deployment_id")
341 .map_err(|e| {
342 Error::custom(format!("rollback: parse previous_deployment_id: {e}"))
343 })?;
344 opt
345 }
346 };
347 match previous_id {
348 None => Err(Error::NoPreviousDeployment {
349 owner_key: owner_key.to_string(),
350 }),
351 Some(prev_id) => self.promote(owner_key, prev_id).await,
352 }
353 }
354}
355
356fn parse_deployment_row(row: &sea_orm::QueryResult) -> Result<Deployment, Error> {
361 let id: i64 = row
362 .try_get_by::<i64, _>("id")
363 .map_err(|e| Error::custom(format!("parse id: {e}")))?;
364 let identifier: String = row
365 .try_get_by::<String, _>("identifier")
366 .map_err(|e| Error::custom(format!("parse identifier: {e}")))?;
367 let owner_key: String = row
368 .try_get_by::<String, _>("owner_key")
369 .map_err(|e| Error::custom(format!("parse owner_key: {e}")))?;
370 let source_ref: Option<String> = row
371 .try_get_by::<Option<String>, _>("source_ref")
372 .map_err(|e| Error::custom(format!("parse source_ref: {e}")))?;
373 let artifact_location: Option<String> = row
374 .try_get_by::<Option<String>, _>("artifact_location")
375 .map_err(|e| Error::custom(format!("parse artifact_location: {e}")))?;
376 let byte_size: Option<i64> = row
377 .try_get_by::<Option<i64>, _>("byte_size")
378 .map_err(|e| Error::custom(format!("parse byte_size: {e}")))?;
379 let status_str: String = row
380 .try_get_by::<String, _>("status")
381 .map_err(|e| Error::custom(format!("parse status: {e}")))?;
382 let status = DeploymentStatus::from_str(&status_str)?;
383 let artifact_deleted_at = parse_optional_timestamp(row, "artifact_deleted_at")?;
384 let terminated_at = parse_optional_timestamp(row, "terminated_at")?;
385 let created_at = parse_timestamp(row, "created_at")?;
386
387 Ok(Deployment {
388 id,
389 identifier,
390 owner_key,
391 source_ref,
392 artifact_location,
393 byte_size,
394 status,
395 artifact_deleted_at,
396 terminated_at,
397 created_at,
398 })
399}
400
401fn parse_timestamp(row: &sea_orm::QueryResult, col: &str) -> Result<DateTime<Utc>, Error> {
403 if let Ok(dt) = row.try_get_by::<DateTime<Utc>, _>(col) {
404 return Ok(dt);
405 }
406 let s: String = row
407 .try_get_by::<String, _>(col)
408 .map_err(|e| Error::custom(format!("parse {col}: {e}")))?;
409 DateTime::parse_from_rfc3339(&s)
410 .map(|dt| dt.with_timezone(&Utc))
411 .map_err(|e| Error::custom(format!("parse {col} as rfc3339 ('{s}'): {e}")))
412}
413
414fn parse_optional_timestamp(
416 row: &sea_orm::QueryResult,
417 col: &str,
418) -> Result<Option<DateTime<Utc>>, Error> {
419 if let Ok(opt) = row.try_get_by::<Option<DateTime<Utc>>, _>(col) {
420 return Ok(opt);
421 }
422 let s: Option<String> = row
423 .try_get_by::<Option<String>, _>(col)
424 .map_err(|e| Error::custom(format!("parse {col}: {e}")))?;
425 match s {
426 None => Ok(None),
427 Some(s) => DateTime::parse_from_rfc3339(&s)
428 .map(|dt| Some(dt.with_timezone(&Utc)))
429 .map_err(|e| Error::custom(format!("parse {col} as rfc3339 ('{s}'): {e}"))),
430 }
431}
432
433fn ph(backend: DatabaseBackend, n: usize) -> Result<String, Error> {
438 match backend {
439 DatabaseBackend::Postgres => Ok(format!("${n}")),
440 DatabaseBackend::Sqlite => Ok(format!("?{n}")),
441 _ => Err(Error::UnsupportedBackend),
442 }
443}
444
445#[cfg(test)]
450mod tests {
451 use super::*;
452 use sea_orm::Database;
453 use sea_orm_migration::MigratorTrait;
454
455 struct TestMigrator;
456
457 #[async_trait::async_trait]
458 impl MigratorTrait for TestMigrator {
459 fn migrations() -> Vec<Box<dyn sea_orm_migration::MigrationTrait>> {
460 vec![
461 Box::new(crate::migration::CreateDeploymentsTable),
462 Box::new(crate::migration::CreateDeploymentPointersTable),
463 ]
464 }
465 }
466
467 async fn setup() -> DatabaseConnection {
468 let conn = Database::connect("sqlite::memory:")
469 .await
470 .expect("connect sqlite::memory:");
471 TestMigrator::up(&conn, None).await.expect("run migrations");
472 conn
473 }
474
475 #[tokio::test]
476 async fn create_sets_building() {
477 let conn = setup().await;
478 let d = Deployments::new(conn);
479 let dep = d
480 .create("owner:1", Some("abc123"))
481 .await
482 .expect("create failed");
483 assert_eq!(dep.status, DeploymentStatus::Building);
484 assert_eq!(dep.owner_key, "owner:1");
485 assert_eq!(dep.source_ref.as_deref(), Some("abc123"));
486 assert!(!dep.identifier.is_empty(), "identifier must be set");
487 assert!(dep.artifact_location.is_none());
488 assert!(dep.byte_size.is_none());
489 }
490
491 #[tokio::test]
492 async fn get_round_trips() {
493 let conn = setup().await;
494 let d = Deployments::new(conn);
495 let dep = d.create("owner:2", Some("ref-xyz")).await.expect("create");
496 let fetched = d.get(dep.id).await.expect("get");
497 assert_eq!(fetched.id, dep.id);
498 assert_eq!(fetched.identifier, dep.identifier);
499 assert_eq!(fetched.owner_key, "owner:2");
500 assert_eq!(fetched.source_ref.as_deref(), Some("ref-xyz"));
501 assert_eq!(fetched.status, DeploymentStatus::Building);
502 }
503
504 #[tokio::test]
505 async fn mark_ready_transitions() {
506 let conn = setup().await;
507 let d = Deployments::new(conn);
508 let dep = d.create("owner:3", None).await.expect("create");
509 d.mark_ready(dep.id, "deployments/1/", 4096)
510 .await
511 .expect("mark_ready failed");
512 let fetched = d.get(dep.id).await.expect("get");
513 assert_eq!(fetched.status, DeploymentStatus::Ready);
514 assert_eq!(fetched.artifact_location.as_deref(), Some("deployments/1/"));
515 assert_eq!(fetched.byte_size, Some(4096));
516 assert!(
517 fetched.terminated_at.is_some(),
518 "terminated_at must be set after mark_ready"
519 );
520 }
521
522 #[tokio::test]
523 async fn mark_ready_rejects_terminal() {
524 let conn = setup().await;
525 let d = Deployments::new(conn);
526 let dep = d.create("owner:4", None).await.expect("create");
527 d.mark_ready(dep.id, "path/", 100)
528 .await
529 .expect("first mark_ready");
530 let result = d.mark_ready(dep.id, "path2/", 200).await;
532 assert!(
533 result.is_err(),
534 "mark_ready on terminal row should return Err"
535 );
536 }
537
538 #[tokio::test]
539 async fn mark_failed_transitions() {
540 let conn = setup().await;
541 let d = Deployments::new(conn);
542 let dep = d.create("owner:5", None).await.expect("create");
543 d.mark_failed(dep.id, "build exploded")
544 .await
545 .expect("mark_failed");
546 let fetched = d.get(dep.id).await.expect("get");
547 assert_eq!(fetched.status, DeploymentStatus::Failed);
548 assert!(
549 fetched.terminated_at.is_some(),
550 "terminated_at must be set after mark_failed"
551 );
552 }
553
554 #[tokio::test]
555 async fn list_returns_owner_rows() {
556 let conn = setup().await;
557 let d = Deployments::new(conn);
558 d.create("owner:6", None).await.expect("create 1");
559 d.create("owner:6", None).await.expect("create 2");
560 d.create("other-owner", None).await.expect("create other");
561
562 let rows = d.list("owner:6").await.expect("list");
563 assert_eq!(rows.len(), 2, "list should return 2 rows for owner:6");
564
565 let other_rows = d.list("other-owner").await.expect("list other");
566 assert_eq!(other_rows.len(), 1);
567 }
568
569 #[tokio::test]
570 async fn active_returns_none_without_pointer() {
571 let conn = setup().await;
572 let d = Deployments::new(conn);
573 let result = d.active("owner:7").await.expect("active");
574 assert!(
575 result.is_none(),
576 "active should be None when no pointer exists"
577 );
578 }
579
580 #[tokio::test]
581 async fn promote_rejects_non_ready() {
582 let conn = setup().await;
583 let d = Deployments::new(conn);
584 let dep = d.create("owner:8", None).await.expect("create");
585 let result = d.promote("owner:8", dep.id).await;
587 assert!(
588 matches!(result, Err(Error::NotReady { id }) if id == dep.id),
589 "promote should return Error::NotReady for a Building deployment, got: {result:?}"
590 );
591 }
592
593 #[tokio::test]
594 async fn promote_rejects_deleted_artifact() {
595 let conn = setup().await;
596 let d = Deployments::new(conn.clone());
597 let dep = d.create("owner:9", None).await.expect("create");
598 d.mark_ready(dep.id, "artifacts/9/", 512)
599 .await
600 .expect("mark_ready");
601 let now_iso = chrono::Utc::now().to_rfc3339();
603 conn.execute(sea_orm::Statement::from_string(
604 sea_orm::DatabaseBackend::Sqlite,
605 format!(
606 "UPDATE deployments SET artifact_deleted_at = '{}' WHERE id = {}",
607 now_iso, dep.id
608 ),
609 ))
610 .await
611 .expect("set artifact_deleted_at");
612 let result = d.promote("owner:9", dep.id).await;
613 assert!(
614 matches!(result, Err(Error::ArtifactDeleted { id }) if id == dep.id),
615 "promote should return Error::ArtifactDeleted, got: {result:?}"
616 );
617 }
618
619 #[tokio::test]
620 async fn promote_returns_previous_id() {
621 let conn = setup().await;
622 let d = Deployments::new(conn);
623 let dep_a = d.create("owner:10", None).await.expect("create a");
624 let dep_b = d.create("owner:10", None).await.expect("create b");
625 d.mark_ready(dep_a.id, "a/", 1).await.expect("ready a");
626 d.mark_ready(dep_b.id, "b/", 2).await.expect("ready b");
627
628 let prev = d
630 .promote("owner:10", dep_a.id)
631 .await
632 .expect("first promote");
633 assert!(
634 prev.is_none(),
635 "first promote should return None as previous"
636 );
637
638 let prev2 = d
640 .promote("owner:10", dep_b.id)
641 .await
642 .expect("second promote");
643 assert_eq!(
644 prev2,
645 Some(dep_a.id),
646 "second promote should return dep_a.id as previous"
647 );
648 }
649
650 #[tokio::test]
651 async fn rollback_promotes_previous() {
652 let conn = setup().await;
653 let d = Deployments::new(conn);
654 let dep_a = d.create("owner:11", None).await.expect("create a");
655 let dep_b = d.create("owner:11", None).await.expect("create b");
656 d.mark_ready(dep_a.id, "a/", 1).await.expect("ready a");
657 d.mark_ready(dep_b.id, "b/", 2).await.expect("ready b");
658 d.promote("owner:11", dep_a.id).await.expect("promote a");
659 d.promote("owner:11", dep_b.id).await.expect("promote b");
660
661 d.rollback("owner:11").await.expect("rollback");
663 let active = d
664 .active("owner:11")
665 .await
666 .expect("active after rollback")
667 .expect("should have active deployment");
668 assert_eq!(
669 active.id, dep_a.id,
670 "after rollback, active should be dep_a"
671 );
672 }
673}