Skip to main content

cedros_data/
http.rs

1use std::sync::Arc;
2#[cfg(feature = "cedros-login-profile")]
3use std::time::Duration;
4
5use axum::{
6    extract::{Path, State},
7    http::{
8        header::{self, HeaderName, HeaderValue},
9        HeaderMap, Method, StatusCode,
10    },
11    response::{IntoResponse, Response},
12    routing::{get, post, put},
13    Json, Router,
14};
15use serde::Deserialize;
16use serde_json::json;
17use tokio::net::TcpListener;
18use tower_http::{
19    cors::{AllowOrigin, CorsLayer},
20    trace::TraceLayer,
21};
22use tracing::{error, info};
23
24use crate::config::HttpServerConfig;
25use crate::error::CedrosDataError;
26use crate::http_auth::CedrosDataPermission;
27#[cfg(feature = "cedros-login-profile")]
28use crate::http_auth::{
29    derive_permissions_url, enforce_permission_with_cedros_login, AuthzError, CedrosLoginState,
30};
31use crate::http_auth::{read_permission_for_collection, write_permission_for_collection};
32use crate::models::{
33    CollectionMode, ContractSchema, ExportSiteRequest, ImportSiteRequest, QueryEntriesRequest,
34    RegisterCollectionRequest, RegisterCustomSchemaRequest, RegisterSiteRequest,
35    UpsertEntryRequest, VerifyContractRequest,
36};
37use crate::store::CedrosData;
38
39#[derive(Clone)]
40pub(crate) struct AppState {
41    pub(crate) store: CedrosData,
42    #[cfg(feature = "cedros-login-profile")]
43    pub(crate) cedros_auth: Option<CedrosLoginState>,
44    #[cfg(feature = "storage")]
45    pub(crate) storage: Option<crate::storage::StorageClient>,
46}
47
48pub async fn run_http_server(
49    store: CedrosData,
50    config: HttpServerConfig,
51    #[cfg(feature = "storage")] storage: Option<crate::storage::StorageClient>,
52) -> crate::error::Result<()> {
53    store.migrate().await?;
54    store.bootstrap_defaults().await?;
55    let app_state = build_state(
56        store,
57        &config,
58        #[cfg(feature = "storage")]
59        storage,
60    )?;
61    let app = build_router(app_state, &config)?;
62    let listener = TcpListener::bind(&config.bind_addr).await?;
63    info!(bind_addr = %config.bind_addr, "cedros-data HTTP server listening");
64    axum::serve(listener, app).await.map_err(Into::into)
65}
66
67#[cfg(feature = "cedros-login-profile")]
68const CEDROS_LOGIN_CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
69#[cfg(feature = "cedros-login-profile")]
70const CEDROS_LOGIN_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
71
72fn build_state(
73    store: CedrosData,
74    config: &HttpServerConfig,
75    #[cfg(feature = "storage")] storage: Option<crate::storage::StorageClient>,
76) -> crate::error::Result<AppState> {
77    validate_http_auth_configuration(config)?;
78
79    #[cfg(feature = "cedros-login-profile")]
80    {
81        let cedros_auth = if config.enable_cedros_login_profile {
82            let verify_url = config.cedros_login_verify_url.clone().ok_or_else(|| {
83                CedrosDataError::InvalidRequest(
84                    "cedros_login_verify_url is required when compatibility profile is enabled"
85                        .to_string(),
86                )
87            })?;
88            let permissions_url = derive_permissions_url(&verify_url)?;
89            let client = reqwest::Client::builder()
90                .connect_timeout(CEDROS_LOGIN_CONNECT_TIMEOUT)
91                .timeout(CEDROS_LOGIN_REQUEST_TIMEOUT)
92                .build()
93                .map_err(|error| {
94                    CedrosDataError::InvalidRequest(format!(
95                        "failed to build cedros-login HTTP client: {error}"
96                    ))
97                })?;
98            Some(CedrosLoginState {
99                permissions_url,
100                client,
101            })
102        } else {
103            None
104        };
105
106        Ok(AppState {
107            store,
108            cedros_auth,
109            #[cfg(feature = "storage")]
110            storage,
111        })
112    }
113
114    #[cfg(not(feature = "cedros-login-profile"))]
115    {
116        if config.enable_cedros_login_profile {
117            return Err(CedrosDataError::InvalidRequest(
118                "cedros-login-profile feature is not enabled".to_string(),
119            ));
120        }
121        Ok(AppState {
122            store,
123            #[cfg(feature = "storage")]
124            storage,
125        })
126    }
127}
128
129fn validate_http_auth_configuration(config: &HttpServerConfig) -> crate::error::Result<()> {
130    if config.enable_cedros_login_profile || config.allow_unauthenticated_http {
131        return Ok(());
132    }
133
134    Err(CedrosDataError::InvalidRequest(
135        "embedded HTTP server requires authentication by default; pass --allow-unauthenticated for local development or enable --cedros-login-profile".to_string(),
136    ))
137}
138
139fn build_router(state: AppState, config: &HttpServerConfig) -> crate::error::Result<Router> {
140    #[allow(unused_mut)]
141    let mut app = Router::new()
142        .route("/migrate", post(migrate))
143        .route("/site", post(register_site))
144        .route("/collections", post(register_collection))
145        .route("/custom-schema", post(register_custom_schema))
146        .route("/entries/upsert", post(upsert_entry))
147        .route("/entries/query", post(query_entries))
148        .route("/site/export", get(export_site))
149        .route("/import", post(import_site))
150        .route("/contract/verify", post(verify_contract))
151        .route("/admin/default-pages", get(default_pages))
152        .route("/admin/bootstrap", post(bootstrap_site))
153        .route(
154            "/admin/collections",
155            get(list_site_collections).post(register_site_collection),
156        )
157        .route("/admin/pages", get(list_site_pages))
158        .route("/admin/pages/{page_key}", put(upsert_site_page))
159        // Public config (no auth)
160        .route("/site/config/{key}", get(crate::http_public::public_site_config));
161
162    #[cfg(feature = "storage")]
163    {
164        use crate::http_storage;
165        app = app
166            .route("/admin/storage/config", get(http_storage::storage_config))
167            .route("/admin/storage/test", post(http_storage::storage_test))
168            .route("/admin/media/upload", post(http_storage::media_upload))
169            .route("/admin/media/assets", get(http_storage::list_media_assets))
170            .route(
171                "/admin/media/assets/{asset_id}",
172                get(http_storage::get_media_asset)
173                    .put(http_storage::update_media_asset)
174                    .delete(http_storage::delete_media_asset),
175            );
176    }
177
178    let mut router = app
179        .layer(TraceLayer::new_for_http())
180        .with_state(Arc::new(state));
181
182    if let Some(cors_layer) = configured_cors_layer(config)? {
183        router = router.layer(cors_layer);
184    }
185
186    Ok(router)
187}
188
189fn configured_cors_layer(config: &HttpServerConfig) -> crate::error::Result<Option<CorsLayer>> {
190    if config.cors_allowed_origins.is_empty() {
191        return Ok(None);
192    }
193
194    let allow_origin = parse_cors_allowed_origins(&config.cors_allowed_origins)?;
195    let cors_layer = CorsLayer::new()
196        .allow_origin(allow_origin)
197        .allow_methods([Method::GET, Method::POST, Method::PUT, Method::OPTIONS])
198        .allow_headers([
199            header::AUTHORIZATION,
200            header::CONTENT_TYPE,
201            header::ACCEPT,
202            HeaderName::from_static("x-cedros-org-id"),
203        ]);
204
205    Ok(Some(cors_layer))
206}
207
208fn parse_cors_allowed_origins(origins: &[String]) -> crate::error::Result<AllowOrigin> {
209    if origins.iter().any(|origin| origin == "*") {
210        if origins.len() > 1 {
211            return Err(CedrosDataError::InvalidRequest(
212                "cors origins cannot mix '*' with explicit origins".to_string(),
213            ));
214        }
215
216        return Ok(AllowOrigin::any());
217    }
218
219    let header_values = origins
220        .iter()
221        .map(|origin| {
222            HeaderValue::from_str(origin).map_err(|error| {
223                CedrosDataError::InvalidRequest(format!(
224                    "invalid cors origin \"{origin}\": {error}"
225                ))
226            })
227        })
228        .collect::<crate::error::Result<Vec<_>>>()?;
229
230    Ok(AllowOrigin::list(header_values))
231}
232
233async fn migrate(
234    State(state): State<Arc<AppState>>,
235    headers: HeaderMap,
236) -> Result<Json<serde_json::Value>, AppError> {
237    enforce_org_permission(&state, &headers, CedrosDataPermission::DataAdmin).await?;
238    state.store.migrate().await?;
239    Ok(Json(json!({"ok": true})))
240}
241
242async fn register_site(
243    State(state): State<Arc<AppState>>,
244    headers: HeaderMap,
245    Json(request): Json<RegisterSiteRequest>,
246) -> Result<Json<serde_json::Value>, AppError> {
247    enforce_org_permission(&state, &headers, CedrosDataPermission::DataAdmin).await?;
248    let site = state.store.register_site(request).await?;
249    Ok(Json(json!(site)))
250}
251
252async fn register_collection(
253    State(state): State<Arc<AppState>>,
254    headers: HeaderMap,
255    Json(request): Json<RegisterCollectionRequest>,
256) -> Result<Json<serde_json::Value>, AppError> {
257    enforce_org_permission(&state, &headers, CedrosDataPermission::CollectionsWrite).await?;
258    let collection = state.store.register_collection(request).await?;
259    Ok(Json(json!(collection)))
260}
261
262async fn register_custom_schema(
263    State(state): State<Arc<AppState>>,
264    headers: HeaderMap,
265    Json(request): Json<RegisterCustomSchemaRequest>,
266) -> Result<Json<serde_json::Value>, AppError> {
267    enforce_org_permission(&state, &headers, CedrosDataPermission::SchemaWrite).await?;
268    let report = state.store.register_custom_schema(request).await?;
269    Ok(Json(json!(report)))
270}
271
272async fn upsert_entry(
273    State(state): State<Arc<AppState>>,
274    headers: HeaderMap,
275    Json(request): Json<UpsertEntryRequest>,
276) -> Result<Json<serde_json::Value>, AppError> {
277    enforce_org_permission(
278        &state,
279        &headers,
280        write_permission_for_collection(&request.collection_name),
281    )
282    .await?;
283    let entry = state.store.upsert_entry(request).await?;
284    Ok(Json(json!(entry)))
285}
286
287async fn query_entries(
288    State(state): State<Arc<AppState>>,
289    headers: HeaderMap,
290    Json(request): Json<QueryEntriesRequest>,
291) -> Result<Json<serde_json::Value>, AppError> {
292    enforce_org_permission(
293        &state,
294        &headers,
295        read_permission_for_collection(&request.collection_name),
296    )
297    .await?;
298    let visitor_id = request.visitor_id.clone();
299    let record_reads = request.entry_keys.len() == 1;
300    let mut entries = state.store.query_entries(request).await?;
301
302    if let Some(ref vid) = visitor_id {
303        crate::store::metered_reads::validate_visitor_id(vid)?;
304        state
305            .store
306            .apply_content_gating(&mut entries, vid, record_reads)
307            .await?;
308    }
309
310    Ok(Json(json!(entries)))
311}
312
313async fn export_site(
314    State(state): State<Arc<AppState>>,
315    headers: HeaderMap,
316) -> Result<Json<serde_json::Value>, AppError> {
317    enforce_org_permission(&state, &headers, CedrosDataPermission::OpsWrite).await?;
318    let export = state.store.export_site(ExportSiteRequest).await?;
319    Ok(Json(json!(export)))
320}
321
322async fn import_site(
323    State(state): State<Arc<AppState>>,
324    headers: HeaderMap,
325    Json(request): Json<ImportSiteRequest>,
326) -> Result<Json<serde_json::Value>, AppError> {
327    enforce_org_permission(&state, &headers, CedrosDataPermission::OpsWrite).await?;
328    let result = state.store.import_site(request).await?;
329    Ok(Json(json!(result)))
330}
331
332async fn verify_contract(
333    State(state): State<Arc<AppState>>,
334    headers: HeaderMap,
335    Json(request): Json<VerifyContractRequest>,
336) -> Result<Json<serde_json::Value>, AppError> {
337    enforce_org_permission(&state, &headers, CedrosDataPermission::ContractRead).await?;
338    let report = state.store.verify_contract(request).await?;
339    Ok(Json(json!(report)))
340}
341
342async fn default_pages(
343    State(state): State<Arc<AppState>>,
344    headers: HeaderMap,
345) -> Result<Json<serde_json::Value>, AppError> {
346    enforce_org_permission(&state, &headers, CedrosDataPermission::PagesRead).await?;
347    Ok(Json(json!(crate::defaults::default_page_templates())))
348}
349
350async fn bootstrap_site(
351    State(state): State<Arc<AppState>>,
352    headers: HeaderMap,
353) -> Result<Json<serde_json::Value>, AppError> {
354    enforce_org_permission(&state, &headers, CedrosDataPermission::OpsWrite).await?;
355    let report = state.store.bootstrap_defaults().await?;
356    Ok(Json(json!(report)))
357}
358
359async fn list_site_collections(
360    State(state): State<Arc<AppState>>,
361    headers: HeaderMap,
362) -> Result<Json<serde_json::Value>, AppError> {
363    enforce_org_permission(&state, &headers, CedrosDataPermission::CollectionsRead).await?;
364    let collections = state.store.list_collections().await?;
365    Ok(Json(json!(collections)))
366}
367
368#[derive(Debug, Deserialize)]
369struct RegisterSiteCollectionRequest {
370    collection_name: String,
371    mode: Option<CollectionMode>,
372    table_name: Option<String>,
373    strict_contract: Option<ContractSchema>,
374}
375
376async fn register_site_collection(
377    State(state): State<Arc<AppState>>,
378    headers: HeaderMap,
379    Json(request): Json<RegisterSiteCollectionRequest>,
380) -> Result<Json<serde_json::Value>, AppError> {
381    enforce_org_permission(&state, &headers, CedrosDataPermission::CollectionsWrite).await?;
382    let collection = state
383        .store
384        .register_collection(RegisterCollectionRequest {
385            collection_name: request.collection_name,
386            mode: request.mode.unwrap_or(CollectionMode::Jsonb),
387            table_name: request.table_name,
388            strict_contract: request.strict_contract,
389        })
390        .await?;
391    Ok(Json(json!(collection)))
392}
393
394async fn list_site_pages(
395    State(state): State<Arc<AppState>>,
396    headers: HeaderMap,
397) -> Result<Json<serde_json::Value>, AppError> {
398    enforce_org_permission(&state, &headers, CedrosDataPermission::PagesRead).await?;
399    let pages = state.store.list_default_page_entries().await?;
400    Ok(Json(json!(pages)))
401}
402
403#[derive(Debug, Deserialize)]
404struct UpsertPageRequest {
405    payload: serde_json::Value,
406}
407
408async fn upsert_site_page(
409    Path(page_key): Path<String>,
410    State(state): State<Arc<AppState>>,
411    headers: HeaderMap,
412    Json(request): Json<UpsertPageRequest>,
413) -> Result<Json<serde_json::Value>, AppError> {
414    enforce_org_permission(&state, &headers, CedrosDataPermission::PagesWrite).await?;
415    let page = state
416        .store
417        .upsert_default_page_entry(&page_key, request.payload)
418        .await?;
419    Ok(Json(json!(page)))
420}
421
422#[allow(unused_variables)]
423pub(crate) async fn enforce_org_permission(
424    state: &Arc<AppState>,
425    headers: &HeaderMap,
426    permission: CedrosDataPermission,
427) -> Result<(), AppError> {
428    #[cfg(feature = "cedros-login-profile")]
429    if let Some(auth_state) = state.cedros_auth.as_ref() {
430        enforce_permission_with_cedros_login(auth_state, headers, permission)
431            .await
432            .map_err(map_authz_error)?;
433    }
434    Ok(())
435}
436
437#[cfg(feature = "cedros-login-profile")]
438fn map_authz_error(error: AuthzError) -> AppError {
439    if error.status.is_server_error() {
440        error!(
441            status = %error.status,
442            detail = %error.message,
443            "authorization request failed"
444        );
445    }
446    AppError::new(error.status, error.message)
447}
448
449pub(crate) struct AppError {
450    status: StatusCode,
451    message: String,
452}
453
454impl AppError {
455    pub(crate) fn new(status: StatusCode, message: String) -> Self {
456        Self { status, message }
457    }
458}
459
460impl From<CedrosDataError> for AppError {
461    fn from(error: CedrosDataError) -> Self {
462        match error {
463            CedrosDataError::SiteNotConfigured | CedrosDataError::CollectionNotFound(_) => {
464                Self::new(StatusCode::NOT_FOUND, error.to_string())
465            }
466            CedrosDataError::BreakingSchemaChange(_)
467            | CedrosDataError::ContractVerificationFailed(_)
468            | CedrosDataError::InvalidIdentifier(_)
469            | CedrosDataError::InvalidRequest(_)
470            | CedrosDataError::InvalidTypeExpression(_) => {
471                Self::new(StatusCode::BAD_REQUEST, error.to_string())
472            }
473            CedrosDataError::MissingPostgresUri => {
474                Self::new(StatusCode::BAD_REQUEST, error.to_string())
475            }
476            CedrosDataError::Storage(_) => {
477                error!(detail = %error, "storage request failed");
478                Self::new(
479                    StatusCode::INTERNAL_SERVER_ERROR,
480                    "storage error".to_string(),
481                )
482            }
483            CedrosDataError::Database(_)
484            | CedrosDataError::Migration(_)
485            | CedrosDataError::Json(_)
486            | CedrosDataError::Io(_) => {
487                error!(detail = %error, "cedros-data request failed");
488                Self::new(
489                    StatusCode::INTERNAL_SERVER_ERROR,
490                    "internal server error".to_string(),
491                )
492            }
493        }
494    }
495}
496
497impl IntoResponse for AppError {
498    fn into_response(self) -> Response {
499        (self.status, Json(json!({ "error": self.message }))).into_response()
500    }
501}
502
503#[cfg(test)]
504mod tests;