Skip to main content

cedros_data/
http.rs

1use std::sync::Arc;
2#[cfg(feature = "cedros-login-profile")]
3use std::time::Duration;
4
5use axum::{
6    extract::{Path, State},
7    http::{
8        header::{self, HeaderName, HeaderValue},
9        HeaderMap, Method, StatusCode,
10    },
11    response::{IntoResponse, Response},
12    routing::{get, post, put},
13    Json, Router,
14};
15use serde::Deserialize;
16use serde_json::json;
17use tokio::net::TcpListener;
18use tower_http::{
19    cors::{AllowOrigin, CorsLayer},
20    trace::TraceLayer,
21};
22use tracing::{error, info};
23
24use crate::config::HttpServerConfig;
25use crate::error::CedrosDataError;
26use crate::http_auth::CedrosDataPermission;
27#[cfg(feature = "cedros-login-profile")]
28use crate::http_auth::{
29    derive_permissions_url, enforce_permission_with_cedros_login, AuthzError, CedrosLoginState,
30};
31use crate::http_auth::{read_permission_for_collection, write_permission_for_collection};
32use crate::models::{
33    CollectionMode, ContractSchema, ExportSiteRequest, ImportSiteRequest, QueryEntriesRequest,
34    RegisterCollectionRequest, RegisterCustomSchemaRequest, RegisterSiteRequest,
35    UpsertEntryRequest, VerifyContractRequest,
36};
37use crate::store::CedrosData;
38
39#[derive(Clone)]
40pub(crate) struct AppState {
41    pub(crate) store: CedrosData,
42    #[cfg(feature = "cedros-login-profile")]
43    pub(crate) cedros_auth: Option<CedrosLoginState>,
44    #[cfg(feature = "storage")]
45    pub(crate) storage: Option<crate::storage::StorageClient>,
46}
47
48pub async fn run_http_server(
49    store: CedrosData,
50    config: HttpServerConfig,
51    #[cfg(feature = "storage")] storage: Option<crate::storage::StorageClient>,
52) -> crate::error::Result<()> {
53    store.migrate().await?;
54    store.bootstrap_defaults().await?;
55    let app_state = build_state(
56        store,
57        &config,
58        #[cfg(feature = "storage")]
59        storage,
60    )?;
61    let app = build_router(app_state, &config)?;
62    let listener = TcpListener::bind(&config.bind_addr).await?;
63    info!(bind_addr = %config.bind_addr, "cedros-data HTTP server listening");
64    axum::serve(listener, app).await.map_err(Into::into)
65}
66
67#[cfg(feature = "cedros-login-profile")]
68const CEDROS_LOGIN_CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
69#[cfg(feature = "cedros-login-profile")]
70const CEDROS_LOGIN_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
71
72fn build_state(
73    store: CedrosData,
74    config: &HttpServerConfig,
75    #[cfg(feature = "storage")] storage: Option<crate::storage::StorageClient>,
76) -> crate::error::Result<AppState> {
77    validate_http_auth_configuration(config)?;
78
79    #[cfg(feature = "cedros-login-profile")]
80    {
81        let cedros_auth = if config.enable_cedros_login_profile {
82            let verify_url = config.cedros_login_verify_url.clone().ok_or_else(|| {
83                CedrosDataError::InvalidRequest(
84                    "cedros_login_verify_url is required when compatibility profile is enabled"
85                        .to_string(),
86                )
87            })?;
88            let permissions_url = derive_permissions_url(&verify_url)?;
89            let client = reqwest::Client::builder()
90                .connect_timeout(CEDROS_LOGIN_CONNECT_TIMEOUT)
91                .timeout(CEDROS_LOGIN_REQUEST_TIMEOUT)
92                .build()
93                .map_err(|error| {
94                    CedrosDataError::InvalidRequest(format!(
95                        "failed to build cedros-login HTTP client: {error}"
96                    ))
97                })?;
98            Some(CedrosLoginState {
99                permissions_url,
100                client,
101            })
102        } else {
103            None
104        };
105
106        Ok(AppState {
107            store,
108            cedros_auth,
109            #[cfg(feature = "storage")]
110            storage,
111        })
112    }
113
114    #[cfg(not(feature = "cedros-login-profile"))]
115    {
116        if config.enable_cedros_login_profile {
117            return Err(CedrosDataError::InvalidRequest(
118                "cedros-login-profile feature is not enabled".to_string(),
119            ));
120        }
121        Ok(AppState {
122            store,
123            #[cfg(feature = "storage")]
124            storage,
125        })
126    }
127}
128
129fn validate_http_auth_configuration(config: &HttpServerConfig) -> crate::error::Result<()> {
130    if config.enable_cedros_login_profile || config.allow_unauthenticated_http {
131        return Ok(());
132    }
133
134    Err(CedrosDataError::InvalidRequest(
135        "embedded HTTP server requires authentication by default; pass --allow-unauthenticated for local development or enable --cedros-login-profile".to_string(),
136    ))
137}
138
139fn build_router(state: AppState, config: &HttpServerConfig) -> crate::error::Result<Router> {
140    #[allow(unused_mut)]
141    let mut app = Router::new()
142        .route("/migrate", post(migrate))
143        .route("/site", post(register_site))
144        .route("/collections", post(register_collection))
145        .route("/custom-schema", post(register_custom_schema))
146        .route("/entries/upsert", post(upsert_entry))
147        .route("/entries/query", post(query_entries))
148        .route("/site/export", get(export_site))
149        .route("/import", post(import_site))
150        .route("/contract/verify", post(verify_contract))
151        .route("/admin/default-pages", get(default_pages))
152        .route("/admin/bootstrap", post(bootstrap_site))
153        .route(
154            "/admin/collections",
155            get(list_site_collections).post(register_site_collection),
156        )
157        .route("/admin/pages", get(list_site_pages))
158        .route("/admin/pages/{page_key}", put(upsert_site_page))
159        // Public config (no auth)
160        .route("/site/config/{key}", get(crate::http_public::public_site_config))
161        .merge(crate::http_discovery::discovery_routes());
162
163    #[cfg(feature = "storage")]
164    {
165        use crate::http_storage;
166        app = app
167            .route("/admin/storage/config", get(http_storage::storage_config))
168            .route("/admin/storage/test", post(http_storage::storage_test))
169            .route("/admin/media/upload", post(http_storage::media_upload))
170            .route("/admin/media/assets", get(http_storage::list_media_assets))
171            .route(
172                "/admin/media/assets/{asset_id}",
173                get(http_storage::get_media_asset)
174                    .put(http_storage::update_media_asset)
175                    .delete(http_storage::delete_media_asset),
176            );
177    }
178
179    let mut router = app
180        .layer(TraceLayer::new_for_http())
181        .with_state(Arc::new(state));
182
183    if let Some(cors_layer) = configured_cors_layer(config)? {
184        router = router.layer(cors_layer);
185    }
186
187    Ok(router)
188}
189
190fn configured_cors_layer(config: &HttpServerConfig) -> crate::error::Result<Option<CorsLayer>> {
191    if config.cors_allowed_origins.is_empty() {
192        return Ok(None);
193    }
194
195    let allow_origin = parse_cors_allowed_origins(&config.cors_allowed_origins)?;
196    let cors_layer = CorsLayer::new()
197        .allow_origin(allow_origin)
198        .allow_methods([Method::GET, Method::POST, Method::PUT, Method::DELETE, Method::OPTIONS])
199        .allow_headers([
200            header::AUTHORIZATION,
201            header::CONTENT_TYPE,
202            header::ACCEPT,
203            HeaderName::from_static("x-cedros-org-id"),
204        ]);
205
206    Ok(Some(cors_layer))
207}
208
209fn parse_cors_allowed_origins(origins: &[String]) -> crate::error::Result<AllowOrigin> {
210    if origins.iter().any(|origin| origin == "*") {
211        if origins.len() > 1 {
212            return Err(CedrosDataError::InvalidRequest(
213                "cors origins cannot mix '*' with explicit origins".to_string(),
214            ));
215        }
216
217        return Ok(AllowOrigin::any());
218    }
219
220    let header_values = origins
221        .iter()
222        .map(|origin| {
223            HeaderValue::from_str(origin).map_err(|error| {
224                CedrosDataError::InvalidRequest(format!(
225                    "invalid cors origin \"{origin}\": {error}"
226                ))
227            })
228        })
229        .collect::<crate::error::Result<Vec<_>>>()?;
230
231    Ok(AllowOrigin::list(header_values))
232}
233
234async fn migrate(
235    State(state): State<Arc<AppState>>,
236    headers: HeaderMap,
237) -> Result<Json<serde_json::Value>, AppError> {
238    enforce_org_permission(&state, &headers, CedrosDataPermission::DataAdmin).await?;
239    state.store.migrate().await?;
240    Ok(Json(json!({"ok": true})))
241}
242
243async fn register_site(
244    State(state): State<Arc<AppState>>,
245    headers: HeaderMap,
246    Json(request): Json<RegisterSiteRequest>,
247) -> Result<Json<serde_json::Value>, AppError> {
248    enforce_org_permission(&state, &headers, CedrosDataPermission::DataAdmin).await?;
249    let site = state.store.register_site(request).await?;
250    Ok(Json(json!(site)))
251}
252
253async fn register_collection(
254    State(state): State<Arc<AppState>>,
255    headers: HeaderMap,
256    Json(request): Json<RegisterCollectionRequest>,
257) -> Result<Json<serde_json::Value>, AppError> {
258    enforce_org_permission(&state, &headers, CedrosDataPermission::CollectionsWrite).await?;
259    let collection = state.store.register_collection(request).await?;
260    Ok(Json(json!(collection)))
261}
262
263async fn register_custom_schema(
264    State(state): State<Arc<AppState>>,
265    headers: HeaderMap,
266    Json(request): Json<RegisterCustomSchemaRequest>,
267) -> Result<Json<serde_json::Value>, AppError> {
268    enforce_org_permission(&state, &headers, CedrosDataPermission::SchemaWrite).await?;
269    let report = state.store.register_custom_schema(request).await?;
270    Ok(Json(json!(report)))
271}
272
273async fn upsert_entry(
274    State(state): State<Arc<AppState>>,
275    headers: HeaderMap,
276    Json(request): Json<UpsertEntryRequest>,
277) -> Result<Json<serde_json::Value>, AppError> {
278    enforce_org_permission(
279        &state,
280        &headers,
281        write_permission_for_collection(&request.collection_name),
282    )
283    .await?;
284    let entry = state.store.upsert_entry(request).await?;
285    Ok(Json(json!(entry)))
286}
287
288async fn query_entries(
289    State(state): State<Arc<AppState>>,
290    headers: HeaderMap,
291    Json(request): Json<QueryEntriesRequest>,
292) -> Result<Json<serde_json::Value>, AppError> {
293    enforce_org_permission(
294        &state,
295        &headers,
296        read_permission_for_collection(&request.collection_name),
297    )
298    .await?;
299    let visitor_id = request.visitor_id.clone();
300    let record_reads = request.entry_keys.len() == 1;
301    let mut entries = state.store.query_entries(request).await?;
302
303    if let Some(ref vid) = visitor_id {
304        crate::store::metered_reads::validate_visitor_id(vid)?;
305        state
306            .store
307            .apply_content_gating(&mut entries, vid, record_reads)
308            .await?;
309    }
310
311    Ok(Json(json!(entries)))
312}
313
314async fn export_site(
315    State(state): State<Arc<AppState>>,
316    headers: HeaderMap,
317) -> Result<Json<serde_json::Value>, AppError> {
318    enforce_org_permission(&state, &headers, CedrosDataPermission::OpsWrite).await?;
319    let export = state.store.export_site(ExportSiteRequest).await?;
320    Ok(Json(json!(export)))
321}
322
323async fn import_site(
324    State(state): State<Arc<AppState>>,
325    headers: HeaderMap,
326    Json(request): Json<ImportSiteRequest>,
327) -> Result<Json<serde_json::Value>, AppError> {
328    enforce_org_permission(&state, &headers, CedrosDataPermission::OpsWrite).await?;
329    let result = state.store.import_site(request).await?;
330    Ok(Json(json!(result)))
331}
332
333async fn verify_contract(
334    State(state): State<Arc<AppState>>,
335    headers: HeaderMap,
336    Json(request): Json<VerifyContractRequest>,
337) -> Result<Json<serde_json::Value>, AppError> {
338    enforce_org_permission(&state, &headers, CedrosDataPermission::ContractRead).await?;
339    let report = state.store.verify_contract(request).await?;
340    Ok(Json(json!(report)))
341}
342
343async fn default_pages(
344    State(state): State<Arc<AppState>>,
345    headers: HeaderMap,
346) -> Result<Json<serde_json::Value>, AppError> {
347    enforce_org_permission(&state, &headers, CedrosDataPermission::PagesRead).await?;
348    Ok(Json(json!(crate::defaults::default_page_templates())))
349}
350
351async fn bootstrap_site(
352    State(state): State<Arc<AppState>>,
353    headers: HeaderMap,
354) -> Result<Json<serde_json::Value>, AppError> {
355    enforce_org_permission(&state, &headers, CedrosDataPermission::OpsWrite).await?;
356    let report = state.store.bootstrap_defaults().await?;
357    Ok(Json(json!(report)))
358}
359
360async fn list_site_collections(
361    State(state): State<Arc<AppState>>,
362    headers: HeaderMap,
363) -> Result<Json<serde_json::Value>, AppError> {
364    enforce_org_permission(&state, &headers, CedrosDataPermission::CollectionsRead).await?;
365    let collections = state.store.list_collections().await?;
366    Ok(Json(json!(collections)))
367}
368
369#[derive(Debug, Deserialize)]
370struct RegisterSiteCollectionRequest {
371    collection_name: String,
372    mode: Option<CollectionMode>,
373    table_name: Option<String>,
374    strict_contract: Option<ContractSchema>,
375}
376
377async fn register_site_collection(
378    State(state): State<Arc<AppState>>,
379    headers: HeaderMap,
380    Json(request): Json<RegisterSiteCollectionRequest>,
381) -> Result<Json<serde_json::Value>, AppError> {
382    enforce_org_permission(&state, &headers, CedrosDataPermission::CollectionsWrite).await?;
383    let collection = state
384        .store
385        .register_collection(RegisterCollectionRequest {
386            collection_name: request.collection_name,
387            mode: request.mode.unwrap_or(CollectionMode::Jsonb),
388            table_name: request.table_name,
389            strict_contract: request.strict_contract,
390        })
391        .await?;
392    Ok(Json(json!(collection)))
393}
394
395async fn list_site_pages(
396    State(state): State<Arc<AppState>>,
397    headers: HeaderMap,
398) -> Result<Json<serde_json::Value>, AppError> {
399    enforce_org_permission(&state, &headers, CedrosDataPermission::PagesRead).await?;
400    let pages = state.store.list_default_page_entries().await?;
401    Ok(Json(json!(pages)))
402}
403
404#[derive(Debug, Deserialize)]
405struct UpsertPageRequest {
406    payload: serde_json::Value,
407}
408
409async fn upsert_site_page(
410    Path(page_key): Path<String>,
411    State(state): State<Arc<AppState>>,
412    headers: HeaderMap,
413    Json(request): Json<UpsertPageRequest>,
414) -> Result<Json<serde_json::Value>, AppError> {
415    enforce_org_permission(&state, &headers, CedrosDataPermission::PagesWrite).await?;
416    let page = state
417        .store
418        .upsert_default_page_entry(&page_key, request.payload)
419        .await?;
420    Ok(Json(json!(page)))
421}
422
423#[allow(unused_variables)]
424pub(crate) async fn enforce_org_permission(
425    state: &Arc<AppState>,
426    headers: &HeaderMap,
427    permission: CedrosDataPermission,
428) -> Result<(), AppError> {
429    #[cfg(feature = "cedros-login-profile")]
430    if let Some(auth_state) = state.cedros_auth.as_ref() {
431        enforce_permission_with_cedros_login(auth_state, headers, permission)
432            .await
433            .map_err(map_authz_error)?;
434    }
435    Ok(())
436}
437
438#[cfg(feature = "cedros-login-profile")]
439fn map_authz_error(error: AuthzError) -> AppError {
440    if error.status.is_server_error() {
441        error!(
442            status = %error.status,
443            detail = %error.message,
444            "authorization request failed"
445        );
446    }
447    AppError::new(error.status, error.message)
448}
449
450pub(crate) struct AppError {
451    status: StatusCode,
452    message: String,
453}
454
455impl AppError {
456    pub(crate) fn new(status: StatusCode, message: String) -> Self {
457        Self { status, message }
458    }
459}
460
461impl From<CedrosDataError> for AppError {
462    fn from(error: CedrosDataError) -> Self {
463        match error {
464            CedrosDataError::SiteNotConfigured | CedrosDataError::CollectionNotFound(_) => {
465                Self::new(StatusCode::NOT_FOUND, error.to_string())
466            }
467            CedrosDataError::BreakingSchemaChange(_)
468            | CedrosDataError::ContractVerificationFailed(_)
469            | CedrosDataError::InvalidIdentifier(_)
470            | CedrosDataError::InvalidRequest(_)
471            | CedrosDataError::InvalidTypeExpression(_) => {
472                Self::new(StatusCode::BAD_REQUEST, error.to_string())
473            }
474            CedrosDataError::MissingPostgresUri => {
475                Self::new(StatusCode::BAD_REQUEST, error.to_string())
476            }
477            CedrosDataError::Storage(_) => {
478                error!(detail = %error, "storage request failed");
479                Self::new(
480                    StatusCode::INTERNAL_SERVER_ERROR,
481                    "storage error".to_string(),
482                )
483            }
484            CedrosDataError::Database(_)
485            | CedrosDataError::Migration(_)
486            | CedrosDataError::Json(_)
487            | CedrosDataError::Io(_) => {
488                error!(detail = %error, "cedros-data request failed");
489                Self::new(
490                    StatusCode::INTERNAL_SERVER_ERROR,
491                    "internal server error".to_string(),
492                )
493            }
494        }
495    }
496}
497
498impl IntoResponse for AppError {
499    fn into_response(self) -> Response {
500        (self.status, Json(json!({ "error": self.message }))).into_response()
501    }
502}
503
504#[cfg(test)]
505mod tests;