mongodb_migrator/
server.rs

1use std::{net::SocketAddr, sync::Arc};
2
3use axum::{
4    extract::{Path, State},
5    http::StatusCode,
6    routing::post,
7    Router,
8};
9use tokio::sync::Mutex;
10use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
11
12use crate::{
13    migration::Migration,
14    migrator::{default::DefaultMigrator, with_migrations_vec::WithMigrationsVec},
15};
16
17struct AppState {
18    migrator: WithMigrationsVec,
19}
20
21type SharedState = Arc<Mutex<AppState>>;
22
23pub struct ServiceParams {
24    pub migrator: MigratorParams,
25    pub server: ServerParams,
26}
27
28pub struct MigratorParams {
29    pub db: DbParams,
30    pub migrations: Vec<Box<dyn Migration>>,
31}
32
33pub struct ServerParams {
34    port: u16,
35}
36
37impl Default for ServiceParams {
38    fn default() -> Self {
39        Self {
40            migrator: MigratorParams {
41                db: DbParams {
42                    connection_string: "mongodb://localhost:27017".to_string(),
43                    log_into_db_name: "test".to_string(),
44                },
45                migrations: vec![],
46            },
47            server: ServerParams { port: 3000 },
48        }
49    }
50}
51
52pub struct DbParams {
53    pub connection_string: String,
54    pub log_into_db_name: String,
55}
56
57pub async fn server(params: ServiceParams) {
58    init_tracing();
59
60    let migrator = init_migrator(params.migrator).await;
61
62    run_server(
63        init_routing(Arc::new(Mutex::new(AppState { migrator }))),
64        params.server.port,
65    )
66    .await;
67}
68
69fn ups() -> Router<SharedState> {
70    Router::new().route("/:id", post(up_migration_with_id))
71}
72
73fn downs() -> Router<SharedState> {
74    Router::new().route("/:id", post(down_migration_with_id))
75}
76
77async fn up_migration_with_id(
78    Path(id): Path<String>,
79    State(state): State<SharedState>,
80) -> StatusCode {
81    let r = state.lock().await.migrator.up_single_from_vec(id).await;
82
83    if r.is_ok() {
84        StatusCode::OK
85    } else {
86        StatusCode::INTERNAL_SERVER_ERROR
87    }
88}
89
90async fn down_migration_with_id(
91    Path(id): Path<String>,
92    State(state): State<SharedState>,
93) -> StatusCode {
94    let r = state.lock().await.migrator.down_single_from_vec(id).await;
95
96    if r.is_ok() {
97        StatusCode::OK
98    } else {
99        StatusCode::INTERNAL_SERVER_ERROR
100    }
101}
102
103fn init_tracing() {
104    tracing_subscriber::registry()
105        .with(
106            tracing_subscriber::EnvFilter::try_from_default_env()
107                .unwrap_or_else(|_| "mongodb-migrator=debug,tower_http=debug".into()),
108        )
109        .with(tracing_subscriber::fmt::layer())
110        .init();
111}
112
113fn init_routing(shared_state: SharedState) -> Router {
114    Router::new()
115        .nest("/up", ups())
116        .nest("/down", downs())
117        .with_state(shared_state)
118}
119
120async fn init_migrator(params: MigratorParams) -> WithMigrationsVec {
121    DefaultMigrator::new()
122        .with_conn(
123            mongodb::Client::with_uri_str(params.db.connection_string)
124                .await
125                .expect("mongodb client created")
126                .database(&params.db.log_into_db_name),
127        )
128        .with_migrations_vec(params.migrations)
129}
130
131async fn run_server(router: Router, port: u16) {
132    let addr = SocketAddr::from(([127, 0, 0, 1], port));
133
134    tracing::debug!("listening on {}", addr);
135
136    axum::Server::bind(&addr)
137        .serve(router.into_make_service())
138        .await
139        .unwrap();
140}