Skip to main content

meshlet_server/
transport.rs

1use std::path::PathBuf;
2use std::sync::Arc;
3
4use axum::extract::State;
5use axum::http::StatusCode;
6use axum::response::IntoResponse;
7use axum::routing::post;
8use axum::{Json, Router};
9use meshlet_proto::messages::{SyncRequest, SyncResponse};
10use tokio::sync::Mutex;
11
12use crate::doc::ServerDoc;
13
14pub struct AppState {
15    pub doc: Mutex<ServerDoc>,
16    pub token: Option<String>,
17    pub data_dir: PathBuf,
18}
19
20pub fn app_router(state: Arc<AppState>) -> Router {
21    Router::new()
22        .route("/sync", post(sync_handler))
23        .with_state(state)
24}
25
26pub async fn sync_handler(
27    State(state): State<Arc<AppState>>,
28    headers: axum::http::HeaderMap,
29    Json(request): Json<SyncRequest>,
30) -> impl IntoResponse {
31    if let Some(ref expected_token) = state.token {
32        let auth_header = headers
33            .get("authorization")
34            .and_then(|v| v.to_str().ok())
35            .unwrap_or("");
36
37        let token = auth_header.strip_prefix("Bearer ").unwrap_or("");
38
39        if token != expected_token {
40            return (
41                StatusCode::UNAUTHORIZED,
42                Json(serde_json::json!({"error": "unauthorized"})),
43            )
44                .into_response();
45        }
46    }
47
48    let doc = state.doc.lock().await;
49
50    let client_updates = match SyncRequest::updates(&request) {
51        Ok(u) => u,
52        Err(e) => {
53            tracing::error!("base64 decode failed: {}", e);
54            return (
55                StatusCode::BAD_REQUEST,
56                Json(serde_json::json!({"error": "invalid base64 encoding"})),
57            )
58                .into_response();
59        }
60    };
61    let client_vv = match SyncRequest::client_vv(&request) {
62        Some(vv) => vv,
63        None => {
64            return (
65                StatusCode::BAD_REQUEST,
66                Json(serde_json::json!({"error": "invalid client_vv"})),
67            )
68                .into_response();
69        }
70    };
71
72    if !client_updates.is_empty()
73        && let Err(e) = doc.import(&client_updates)
74    {
75        tracing::error!("import failed: {}", e);
76        return (
77            StatusCode::INTERNAL_SERVER_ERROR,
78            Json(serde_json::json!({"error": "import failed"})),
79        )
80            .into_response();
81    }
82
83    let server_updates = match doc.export_updates_since(&client_vv) {
84        Ok(u) => u,
85        Err(e) => {
86            tracing::error!("export failed: {}", e);
87            return (
88                StatusCode::INTERNAL_SERVER_ERROR,
89                Json(serde_json::json!({"error": "export failed"})),
90            )
91                .into_response();
92        }
93    };
94
95    let server_vv = doc.oplog_vv();
96
97    if let Err(e) = doc.save(&state.data_dir) {
98        tracing::error!("save failed: {}", e);
99    }
100
101    let response = SyncResponse::new(&server_vv, &server_updates);
102    (StatusCode::OK, Json(response)).into_response()
103}