meshlet_server/
transport.rs1use std::path::PathBuf;
2use std::sync::Arc;
3
4use axum::extract::State;
5use axum::http::StatusCode;
6use axum::response::IntoResponse;
7use axum::routing::post;
8use axum::{Json, Router};
9use meshlet_proto::messages::{SyncRequest, SyncResponse};
10use tokio::sync::Mutex;
11
12use crate::doc::ServerDoc;
13
14pub struct AppState {
15 pub doc: Mutex<ServerDoc>,
16 pub token: Option<String>,
17 pub data_dir: PathBuf,
18}
19
20pub fn app_router(state: Arc<AppState>) -> Router {
21 Router::new()
22 .route("/sync", post(sync_handler))
23 .with_state(state)
24}
25
26pub async fn sync_handler(
27 State(state): State<Arc<AppState>>,
28 headers: axum::http::HeaderMap,
29 Json(request): Json<SyncRequest>,
30) -> impl IntoResponse {
31 if let Some(ref expected_token) = state.token {
32 let auth_header = headers
33 .get("authorization")
34 .and_then(|v| v.to_str().ok())
35 .unwrap_or("");
36
37 let token = auth_header.strip_prefix("Bearer ").unwrap_or("");
38
39 if token != expected_token {
40 return (
41 StatusCode::UNAUTHORIZED,
42 Json(serde_json::json!({"error": "unauthorized"})),
43 )
44 .into_response();
45 }
46 }
47
48 let doc = state.doc.lock().await;
49
50 let client_updates = match SyncRequest::updates(&request) {
51 Ok(u) => u,
52 Err(e) => {
53 tracing::error!("base64 decode failed: {}", e);
54 return (
55 StatusCode::BAD_REQUEST,
56 Json(serde_json::json!({"error": "invalid base64 encoding"})),
57 )
58 .into_response();
59 }
60 };
61 let client_vv = match SyncRequest::client_vv(&request) {
62 Some(vv) => vv,
63 None => {
64 return (
65 StatusCode::BAD_REQUEST,
66 Json(serde_json::json!({"error": "invalid client_vv"})),
67 )
68 .into_response();
69 }
70 };
71
72 if !client_updates.is_empty()
73 && let Err(e) = doc.import(&client_updates)
74 {
75 tracing::error!("import failed: {}", e);
76 return (
77 StatusCode::INTERNAL_SERVER_ERROR,
78 Json(serde_json::json!({"error": "import failed"})),
79 )
80 .into_response();
81 }
82
83 let server_updates = match doc.export_updates_since(&client_vv) {
84 Ok(u) => u,
85 Err(e) => {
86 tracing::error!("export failed: {}", e);
87 return (
88 StatusCode::INTERNAL_SERVER_ERROR,
89 Json(serde_json::json!({"error": "export failed"})),
90 )
91 .into_response();
92 }
93 };
94
95 let server_vv = doc.oplog_vv();
96
97 if let Err(e) = doc.save(&state.data_dir) {
98 tracing::error!("save failed: {}", e);
99 }
100
101 let response = SyncResponse::new(&server_vv, &server_updates);
102 (StatusCode::OK, Json(response)).into_response()
103}