mongodb_migrator/
server.rs1use std::{net::SocketAddr, sync::Arc};
2
3use axum::{
4 extract::{Path, State},
5 http::StatusCode,
6 routing::post,
7 Router,
8};
9use tokio::sync::Mutex;
10use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
11
12use crate::{
13 migration::Migration,
14 migrator::{default::DefaultMigrator, with_migrations_vec::WithMigrationsVec},
15};
16
17struct AppState {
18 migrator: WithMigrationsVec,
19}
20
21type SharedState = Arc<Mutex<AppState>>;
22
23pub struct ServiceParams {
24 pub migrator: MigratorParams,
25 pub server: ServerParams,
26}
27
28pub struct MigratorParams {
29 pub db: DbParams,
30 pub migrations: Vec<Box<dyn Migration>>,
31}
32
33pub struct ServerParams {
34 port: u16,
35}
36
37impl Default for ServiceParams {
38 fn default() -> Self {
39 Self {
40 migrator: MigratorParams {
41 db: DbParams {
42 connection_string: "mongodb://localhost:27017".to_string(),
43 log_into_db_name: "test".to_string(),
44 },
45 migrations: vec![],
46 },
47 server: ServerParams { port: 3000 },
48 }
49 }
50}
51
52pub struct DbParams {
53 pub connection_string: String,
54 pub log_into_db_name: String,
55}
56
57pub async fn server(params: ServiceParams) {
58 init_tracing();
59
60 let migrator = init_migrator(params.migrator).await;
61
62 run_server(
63 init_routing(Arc::new(Mutex::new(AppState { migrator }))),
64 params.server.port,
65 )
66 .await;
67}
68
69fn ups() -> Router<SharedState> {
70 Router::new().route("/:id", post(up_migration_with_id))
71}
72
73fn downs() -> Router<SharedState> {
74 Router::new().route("/:id", post(down_migration_with_id))
75}
76
77async fn up_migration_with_id(
78 Path(id): Path<String>,
79 State(state): State<SharedState>,
80) -> StatusCode {
81 let r = state.lock().await.migrator.up_single_from_vec(id).await;
82
83 if r.is_ok() {
84 StatusCode::OK
85 } else {
86 StatusCode::INTERNAL_SERVER_ERROR
87 }
88}
89
90async fn down_migration_with_id(
91 Path(id): Path<String>,
92 State(state): State<SharedState>,
93) -> StatusCode {
94 let r = state.lock().await.migrator.down_single_from_vec(id).await;
95
96 if r.is_ok() {
97 StatusCode::OK
98 } else {
99 StatusCode::INTERNAL_SERVER_ERROR
100 }
101}
102
103fn init_tracing() {
104 tracing_subscriber::registry()
105 .with(
106 tracing_subscriber::EnvFilter::try_from_default_env()
107 .unwrap_or_else(|_| "mongodb-migrator=debug,tower_http=debug".into()),
108 )
109 .with(tracing_subscriber::fmt::layer())
110 .init();
111}
112
113fn init_routing(shared_state: SharedState) -> Router {
114 Router::new()
115 .nest("/up", ups())
116 .nest("/down", downs())
117 .with_state(shared_state)
118}
119
120async fn init_migrator(params: MigratorParams) -> WithMigrationsVec {
121 DefaultMigrator::new()
122 .with_conn(
123 mongodb::Client::with_uri_str(params.db.connection_string)
124 .await
125 .expect("mongodb client created")
126 .database(¶ms.db.log_into_db_name),
127 )
128 .with_migrations_vec(params.migrations)
129}
130
131async fn run_server(router: Router, port: u16) {
132 let addr = SocketAddr::from(([127, 0, 0, 1], port));
133
134 tracing::debug!("listening on {}", addr);
135
136 axum::Server::bind(&addr)
137 .serve(router.into_make_service())
138 .await
139 .unwrap();
140}