1use chrono::{DateTime, Utc};
7use sea_orm::{ConnectionTrait, DatabaseBackend, DatabaseConnection, Statement, Value};
8use serde::{Deserialize, Serialize};
9
10use crate::error::Error;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct Deployment {
19 pub id: i64,
21 pub identifier: String,
23 pub owner_key: String,
25 pub source_ref: Option<String>,
27 pub artifact_location: Option<String>,
29 pub byte_size: Option<i64>,
31 pub status: DeploymentStatus,
33 pub artifact_deleted_at: Option<DateTime<Utc>>,
36 pub terminated_at: Option<DateTime<Utc>>,
38 pub created_at: DateTime<Utc>,
40}
41
42#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
48#[serde(rename_all = "snake_case")]
49pub enum DeploymentStatus {
50 Building,
52 Ready,
54 Failed,
56}
57
58impl DeploymentStatus {
59 fn from_str(s: &str) -> Result<Self, Error> {
60 match s {
61 "building" => Ok(Self::Building),
62 "ready" => Ok(Self::Ready),
63 "failed" => Ok(Self::Failed),
64 other => Err(Error::custom(format!("unknown deployment status: {other}"))),
65 }
66 }
67}
68
69#[derive(Clone)]
78pub struct Deployments {
79 db: DatabaseConnection,
80}
81
82impl Deployments {
83 pub fn new(db: DatabaseConnection) -> Self {
85 Self { db }
86 }
87
88 pub async fn create(
92 &self,
93 owner_key: &str,
94 source_ref: Option<&str>,
95 ) -> Result<Deployment, Error> {
96 let identifier = uuid::Uuid::new_v4().to_string();
97 let now = Utc::now();
98 let backend = self.db.get_database_backend();
99
100 let (p1, p2, p3, p4) = (
101 ph(backend, 1)?,
102 ph(backend, 2)?,
103 ph(backend, 3)?,
104 ph(backend, 4)?,
105 );
106 let sql = format!(
107 "INSERT INTO deployments \
108 (identifier, owner_key, source_ref, artifact_location, byte_size, status, \
109 artifact_deleted_at, terminated_at, created_at) \
110 VALUES ({p1}, {p2}, {p3}, NULL, NULL, 'building', NULL, NULL, {p4}) \
111 RETURNING id"
112 );
113 let stmt = Statement::from_sql_and_values(
114 backend,
115 &sql,
116 [
117 Value::String(Some(Box::new(identifier.clone()))),
118 Value::String(Some(Box::new(owner_key.to_string()))),
119 source_ref.map_or(Value::String(None), |s| {
120 Value::String(Some(Box::new(s.to_string())))
121 }),
122 Value::ChronoDateTimeUtc(Some(Box::new(now))),
123 ],
124 );
125
126 let row = self
127 .db
128 .query_one(stmt)
129 .await
130 .map_err(Error::Db)?
131 .ok_or_else(|| Error::custom("create: INSERT RETURNING returned no row"))?;
132
133 let id: i64 = row
134 .try_get_by::<i64, _>("id")
135 .map_err(|e| Error::custom(format!("create: parse id: {e}")))?;
136
137 Ok(Deployment {
138 id,
139 identifier,
140 owner_key: owner_key.to_string(),
141 source_ref: source_ref.map(str::to_string),
142 artifact_location: None,
143 byte_size: None,
144 status: DeploymentStatus::Building,
145 artifact_deleted_at: None,
146 terminated_at: None,
147 created_at: now,
148 })
149 }
150
151 pub async fn mark_ready(
157 &self,
158 id: i64,
159 artifact_location: &str,
160 byte_size: i64,
161 ) -> Result<(), Error> {
162 let now = Utc::now();
163 let backend = self.db.get_database_backend();
164 let (p1, p2, p3, p4) = (
165 ph(backend, 1)?,
166 ph(backend, 2)?,
167 ph(backend, 3)?,
168 ph(backend, 4)?,
169 );
170 let sql = format!(
171 "UPDATE deployments \
172 SET status = 'ready', artifact_location = {p1}, byte_size = {p2}, terminated_at = {p3} \
173 WHERE id = {p4} AND status = 'building'"
174 );
175 let stmt = Statement::from_sql_and_values(
176 backend,
177 &sql,
178 [
179 Value::String(Some(Box::new(artifact_location.to_string()))),
180 Value::BigInt(Some(byte_size)),
181 Value::ChronoDateTimeUtc(Some(Box::new(now))),
182 Value::BigInt(Some(id)),
183 ],
184 );
185 let result = self.db.execute(stmt).await.map_err(Error::Db)?;
186 if result.rows_affected() == 0 {
187 self.get(id).await?;
191 return Err(Error::custom(format!(
192 "deployment {id} is not in building state; transition to ready rejected"
193 )));
194 }
195 Ok(())
196 }
197
198 pub async fn mark_failed(&self, id: i64, error: &str) -> Result<(), Error> {
204 tracing::warn!(
205 deployment_id = id,
206 error = error,
207 "deployment marked failed"
208 );
209 let now = Utc::now();
210 let backend = self.db.get_database_backend();
211 let (p1, p2) = (ph(backend, 1)?, ph(backend, 2)?);
212 let sql = format!(
213 "UPDATE deployments \
214 SET status = 'failed', terminated_at = {p1} \
215 WHERE id = {p2} AND status = 'building'"
216 );
217 let stmt = Statement::from_sql_and_values(
218 backend,
219 &sql,
220 [
221 Value::ChronoDateTimeUtc(Some(Box::new(now))),
222 Value::BigInt(Some(id)),
223 ],
224 );
225 let result = self.db.execute(stmt).await.map_err(Error::Db)?;
226 if result.rows_affected() == 0 {
227 self.get(id).await?;
228 return Err(Error::custom(format!(
229 "deployment {id} is not in building state; transition to failed rejected"
230 )));
231 }
232 Ok(())
233 }
234
235 pub async fn get(&self, id: i64) -> Result<Deployment, Error> {
237 let backend = self.db.get_database_backend();
238 let p1 = ph(backend, 1)?;
239 let sql = format!(
240 "SELECT id, identifier, owner_key, source_ref, artifact_location, byte_size, \
241 status, artifact_deleted_at, terminated_at, created_at \
242 FROM deployments WHERE id = {p1}"
243 );
244 let stmt = Statement::from_sql_and_values(backend, &sql, [Value::BigInt(Some(id))]);
245 let row = self
246 .db
247 .query_one(stmt)
248 .await
249 .map_err(Error::Db)?
250 .ok_or(Error::NotFound { id })?;
251 parse_deployment_row(&row)
252 }
253
254 pub async fn list(&self, owner_key: &str) -> Result<Vec<Deployment>, Error> {
256 let backend = self.db.get_database_backend();
257 let p1 = ph(backend, 1)?;
258 let sql = format!(
259 "SELECT id, identifier, owner_key, source_ref, artifact_location, byte_size, \
260 status, artifact_deleted_at, terminated_at, created_at \
261 FROM deployments WHERE owner_key = {p1} ORDER BY id DESC"
262 );
263 let stmt = Statement::from_sql_and_values(
264 backend,
265 &sql,
266 [Value::String(Some(Box::new(owner_key.to_string())))],
267 );
268 let rows = self.db.query_all(stmt).await.map_err(Error::Db)?;
269 rows.iter().map(parse_deployment_row).collect()
270 }
271
272 pub async fn active(&self, owner_key: &str) -> Result<Option<Deployment>, Error> {
275 let backend = self.db.get_database_backend();
276 let p1 = ph(backend, 1)?;
277 let sql = format!("SELECT deployment_id FROM deployment_pointers WHERE owner_key = {p1}");
278 let stmt = Statement::from_sql_and_values(
279 backend,
280 &sql,
281 [Value::String(Some(Box::new(owner_key.to_string())))],
282 );
283 let row = self.db.query_one(stmt).await.map_err(Error::Db)?;
284 match row {
285 None => Ok(None),
286 Some(r) => {
287 let deployment_id: i64 = r
288 .try_get_by::<i64, _>("deployment_id")
289 .map_err(|e| Error::custom(format!("active: parse deployment_id: {e}")))?;
290 self.get(deployment_id).await.map(Some)
291 }
292 }
293 }
294
295 pub async fn promote(&self, owner_key: &str, deployment_id: i64) -> Result<Option<i64>, Error> {
305 let dep = self.get(deployment_id).await?;
306 if dep.status != DeploymentStatus::Ready {
307 return Err(Error::NotReady { id: deployment_id });
308 }
309 if dep.artifact_deleted_at.is_some() {
310 return Err(Error::ArtifactDeleted { id: deployment_id });
311 }
312 crate::promote::promote(&self.db, owner_key, deployment_id).await
313 }
314
315 pub async fn rollback(&self, owner_key: &str) -> Result<Option<i64>, Error> {
320 let backend = self.db.get_database_backend();
321 let p1 = ph(backend, 1)?;
322 let sql = format!(
323 "SELECT previous_deployment_id FROM deployment_pointers WHERE owner_key = {p1}"
324 );
325 let stmt = Statement::from_sql_and_values(
326 backend,
327 &sql,
328 [Value::String(Some(Box::new(owner_key.to_string())))],
329 );
330 let row = self.db.query_one(stmt).await.map_err(Error::Db)?;
331 let previous_id = match row {
332 None => {
333 return Err(Error::NoPreviousDeployment {
334 owner_key: owner_key.to_string(),
335 })
336 }
337 Some(r) => {
338 let opt: Option<i64> = r
339 .try_get_by::<Option<i64>, _>("previous_deployment_id")
340 .map_err(|e| {
341 Error::custom(format!("rollback: parse previous_deployment_id: {e}"))
342 })?;
343 opt
344 }
345 };
346 match previous_id {
347 None => Err(Error::NoPreviousDeployment {
348 owner_key: owner_key.to_string(),
349 }),
350 Some(prev_id) => self.promote(owner_key, prev_id).await,
351 }
352 }
353}
354
355fn parse_deployment_row(row: &sea_orm::QueryResult) -> Result<Deployment, Error> {
360 let id: i64 = row
361 .try_get_by::<i64, _>("id")
362 .map_err(|e| Error::custom(format!("parse id: {e}")))?;
363 let identifier: String = row
364 .try_get_by::<String, _>("identifier")
365 .map_err(|e| Error::custom(format!("parse identifier: {e}")))?;
366 let owner_key: String = row
367 .try_get_by::<String, _>("owner_key")
368 .map_err(|e| Error::custom(format!("parse owner_key: {e}")))?;
369 let source_ref: Option<String> = row
370 .try_get_by::<Option<String>, _>("source_ref")
371 .map_err(|e| Error::custom(format!("parse source_ref: {e}")))?;
372 let artifact_location: Option<String> = row
373 .try_get_by::<Option<String>, _>("artifact_location")
374 .map_err(|e| Error::custom(format!("parse artifact_location: {e}")))?;
375 let byte_size: Option<i64> = row
376 .try_get_by::<Option<i64>, _>("byte_size")
377 .map_err(|e| Error::custom(format!("parse byte_size: {e}")))?;
378 let status_str: String = row
379 .try_get_by::<String, _>("status")
380 .map_err(|e| Error::custom(format!("parse status: {e}")))?;
381 let status = DeploymentStatus::from_str(&status_str)?;
382 let artifact_deleted_at = parse_optional_timestamp(row, "artifact_deleted_at")?;
383 let terminated_at = parse_optional_timestamp(row, "terminated_at")?;
384 let created_at = parse_timestamp(row, "created_at")?;
385
386 Ok(Deployment {
387 id,
388 identifier,
389 owner_key,
390 source_ref,
391 artifact_location,
392 byte_size,
393 status,
394 artifact_deleted_at,
395 terminated_at,
396 created_at,
397 })
398}
399
400fn parse_timestamp(row: &sea_orm::QueryResult, col: &str) -> Result<DateTime<Utc>, Error> {
402 if let Ok(dt) = row.try_get_by::<DateTime<Utc>, _>(col) {
403 return Ok(dt);
404 }
405 let s: String = row
406 .try_get_by::<String, _>(col)
407 .map_err(|e| Error::custom(format!("parse {col}: {e}")))?;
408 DateTime::parse_from_rfc3339(&s)
409 .map(|dt| dt.with_timezone(&Utc))
410 .map_err(|e| Error::custom(format!("parse {col} as rfc3339 ('{s}'): {e}")))
411}
412
413fn parse_optional_timestamp(
415 row: &sea_orm::QueryResult,
416 col: &str,
417) -> Result<Option<DateTime<Utc>>, Error> {
418 if let Ok(opt) = row.try_get_by::<Option<DateTime<Utc>>, _>(col) {
419 return Ok(opt);
420 }
421 let s: Option<String> = row
422 .try_get_by::<Option<String>, _>(col)
423 .map_err(|e| Error::custom(format!("parse {col}: {e}")))?;
424 match s {
425 None => Ok(None),
426 Some(s) => DateTime::parse_from_rfc3339(&s)
427 .map(|dt| Some(dt.with_timezone(&Utc)))
428 .map_err(|e| Error::custom(format!("parse {col} as rfc3339 ('{s}'): {e}"))),
429 }
430}
431
432fn ph(backend: DatabaseBackend, n: usize) -> Result<String, Error> {
437 match backend {
438 DatabaseBackend::Postgres => Ok(format!("${n}")),
439 DatabaseBackend::Sqlite => Ok(format!("?{n}")),
440 _ => Err(Error::UnsupportedBackend),
441 }
442}
443
444#[cfg(test)]
449mod tests {
450 use super::*;
451 use sea_orm::Database;
452 use sea_orm_migration::MigratorTrait;
453
454 struct TestMigrator;
455
456 #[async_trait::async_trait]
457 impl MigratorTrait for TestMigrator {
458 fn migrations() -> Vec<Box<dyn sea_orm_migration::MigrationTrait>> {
459 vec![
460 Box::new(crate::migration::CreateDeploymentsTable),
461 Box::new(crate::migration::CreateDeploymentPointersTable),
462 ]
463 }
464 }
465
466 async fn setup() -> DatabaseConnection {
467 let conn = Database::connect("sqlite::memory:")
468 .await
469 .expect("connect sqlite::memory:");
470 TestMigrator::up(&conn, None).await.expect("run migrations");
471 conn
472 }
473
474 #[tokio::test]
475 async fn create_sets_building() {
476 let conn = setup().await;
477 let d = Deployments::new(conn);
478 let dep = d
479 .create("owner:1", Some("abc123"))
480 .await
481 .expect("create failed");
482 assert_eq!(dep.status, DeploymentStatus::Building);
483 assert_eq!(dep.owner_key, "owner:1");
484 assert_eq!(dep.source_ref.as_deref(), Some("abc123"));
485 assert!(!dep.identifier.is_empty(), "identifier must be set");
486 assert!(dep.artifact_location.is_none());
487 assert!(dep.byte_size.is_none());
488 }
489
490 #[tokio::test]
491 async fn get_round_trips() {
492 let conn = setup().await;
493 let d = Deployments::new(conn);
494 let dep = d.create("owner:2", Some("ref-xyz")).await.expect("create");
495 let fetched = d.get(dep.id).await.expect("get");
496 assert_eq!(fetched.id, dep.id);
497 assert_eq!(fetched.identifier, dep.identifier);
498 assert_eq!(fetched.owner_key, "owner:2");
499 assert_eq!(fetched.source_ref.as_deref(), Some("ref-xyz"));
500 assert_eq!(fetched.status, DeploymentStatus::Building);
501 }
502
503 #[tokio::test]
504 async fn mark_ready_transitions() {
505 let conn = setup().await;
506 let d = Deployments::new(conn);
507 let dep = d.create("owner:3", None).await.expect("create");
508 d.mark_ready(dep.id, "deployments/1/", 4096)
509 .await
510 .expect("mark_ready failed");
511 let fetched = d.get(dep.id).await.expect("get");
512 assert_eq!(fetched.status, DeploymentStatus::Ready);
513 assert_eq!(fetched.artifact_location.as_deref(), Some("deployments/1/"));
514 assert_eq!(fetched.byte_size, Some(4096));
515 assert!(
516 fetched.terminated_at.is_some(),
517 "terminated_at must be set after mark_ready"
518 );
519 }
520
521 #[tokio::test]
522 async fn mark_ready_rejects_terminal() {
523 let conn = setup().await;
524 let d = Deployments::new(conn);
525 let dep = d.create("owner:4", None).await.expect("create");
526 d.mark_ready(dep.id, "path/", 100)
527 .await
528 .expect("first mark_ready");
529 let result = d.mark_ready(dep.id, "path2/", 200).await;
531 assert!(
532 result.is_err(),
533 "mark_ready on terminal row should return Err"
534 );
535 }
536
537 #[tokio::test]
538 async fn mark_failed_transitions() {
539 let conn = setup().await;
540 let d = Deployments::new(conn);
541 let dep = d.create("owner:5", None).await.expect("create");
542 d.mark_failed(dep.id, "build exploded")
543 .await
544 .expect("mark_failed");
545 let fetched = d.get(dep.id).await.expect("get");
546 assert_eq!(fetched.status, DeploymentStatus::Failed);
547 assert!(
548 fetched.terminated_at.is_some(),
549 "terminated_at must be set after mark_failed"
550 );
551 }
552
553 #[tokio::test]
554 async fn list_returns_owner_rows() {
555 let conn = setup().await;
556 let d = Deployments::new(conn);
557 d.create("owner:6", None).await.expect("create 1");
558 d.create("owner:6", None).await.expect("create 2");
559 d.create("other-owner", None).await.expect("create other");
560
561 let rows = d.list("owner:6").await.expect("list");
562 assert_eq!(rows.len(), 2, "list should return 2 rows for owner:6");
563
564 let other_rows = d.list("other-owner").await.expect("list other");
565 assert_eq!(other_rows.len(), 1);
566 }
567
568 #[tokio::test]
569 async fn active_returns_none_without_pointer() {
570 let conn = setup().await;
571 let d = Deployments::new(conn);
572 let result = d.active("owner:7").await.expect("active");
573 assert!(
574 result.is_none(),
575 "active should be None when no pointer exists"
576 );
577 }
578
579 #[tokio::test]
580 async fn promote_rejects_non_ready() {
581 let conn = setup().await;
582 let d = Deployments::new(conn);
583 let dep = d.create("owner:8", None).await.expect("create");
584 let result = d.promote("owner:8", dep.id).await;
586 assert!(
587 matches!(result, Err(Error::NotReady { id }) if id == dep.id),
588 "promote should return Error::NotReady for a Building deployment, got: {result:?}"
589 );
590 }
591
592 #[tokio::test]
593 async fn promote_rejects_deleted_artifact() {
594 let conn = setup().await;
595 let d = Deployments::new(conn.clone());
596 let dep = d.create("owner:9", None).await.expect("create");
597 d.mark_ready(dep.id, "artifacts/9/", 512)
598 .await
599 .expect("mark_ready");
600 let now_iso = chrono::Utc::now().to_rfc3339();
602 conn.execute(sea_orm::Statement::from_string(
603 sea_orm::DatabaseBackend::Sqlite,
604 format!(
605 "UPDATE deployments SET artifact_deleted_at = '{}' WHERE id = {}",
606 now_iso, dep.id
607 ),
608 ))
609 .await
610 .expect("set artifact_deleted_at");
611 let result = d.promote("owner:9", dep.id).await;
612 assert!(
613 matches!(result, Err(Error::ArtifactDeleted { id }) if id == dep.id),
614 "promote should return Error::ArtifactDeleted, got: {result:?}"
615 );
616 }
617
618 #[tokio::test]
619 async fn promote_returns_previous_id() {
620 let conn = setup().await;
621 let d = Deployments::new(conn);
622 let dep_a = d.create("owner:10", None).await.expect("create a");
623 let dep_b = d.create("owner:10", None).await.expect("create b");
624 d.mark_ready(dep_a.id, "a/", 1).await.expect("ready a");
625 d.mark_ready(dep_b.id, "b/", 2).await.expect("ready b");
626
627 let prev = d
629 .promote("owner:10", dep_a.id)
630 .await
631 .expect("first promote");
632 assert!(
633 prev.is_none(),
634 "first promote should return None as previous"
635 );
636
637 let prev2 = d
639 .promote("owner:10", dep_b.id)
640 .await
641 .expect("second promote");
642 assert_eq!(
643 prev2,
644 Some(dep_a.id),
645 "second promote should return dep_a.id as previous"
646 );
647 }
648
649 #[tokio::test]
650 async fn rollback_promotes_previous() {
651 let conn = setup().await;
652 let d = Deployments::new(conn);
653 let dep_a = d.create("owner:11", None).await.expect("create a");
654 let dep_b = d.create("owner:11", None).await.expect("create b");
655 d.mark_ready(dep_a.id, "a/", 1).await.expect("ready a");
656 d.mark_ready(dep_b.id, "b/", 2).await.expect("ready b");
657 d.promote("owner:11", dep_a.id).await.expect("promote a");
658 d.promote("owner:11", dep_b.id).await.expect("promote b");
659
660 d.rollback("owner:11").await.expect("rollback");
662 let active = d
663 .active("owner:11")
664 .await
665 .expect("active after rollback")
666 .expect("should have active deployment");
667 assert_eq!(
668 active.id, dep_a.id,
669 "after rollback, active should be dep_a"
670 );
671 }
672}