1use std::sync::Arc;
2#[cfg(feature = "cedros-login-profile")]
3use std::time::Duration;
4
5use axum::{
6 extract::{Path, State},
7 http::{
8 header::{self, HeaderName, HeaderValue},
9 HeaderMap, Method, StatusCode,
10 },
11 response::{IntoResponse, Response},
12 routing::{get, post, put},
13 Json, Router,
14};
15use serde::Deserialize;
16use serde_json::json;
17use tokio::net::TcpListener;
18use tower_http::{
19 cors::{AllowOrigin, CorsLayer},
20 trace::TraceLayer,
21};
22use tracing::{error, info};
23
24use crate::config::HttpServerConfig;
25use crate::error::CedrosDataError;
26use crate::http_auth::CedrosDataPermission;
27#[cfg(feature = "cedros-login-profile")]
28use crate::http_auth::{
29 derive_permissions_url, enforce_permission_with_cedros_login, AuthzError, CedrosLoginState,
30};
31use crate::http_auth::{read_permission_for_collection, write_permission_for_collection};
32use crate::models::{
33 CollectionMode, ContractSchema, ExportSiteRequest, ImportSiteRequest, QueryEntriesRequest,
34 RegisterCollectionRequest, RegisterCustomSchemaRequest, RegisterSiteRequest,
35 UpsertEntryRequest, VerifyContractRequest,
36};
37use crate::store::CedrosData;
38
39#[derive(Clone)]
40pub(crate) struct AppState {
41 pub(crate) store: CedrosData,
42 #[cfg(feature = "cedros-login-profile")]
43 pub(crate) cedros_auth: Option<CedrosLoginState>,
44 #[cfg(feature = "storage")]
45 pub(crate) storage: Option<crate::storage::StorageClient>,
46}
47
48pub async fn run_http_server(
49 store: CedrosData,
50 config: HttpServerConfig,
51 #[cfg(feature = "storage")] storage: Option<crate::storage::StorageClient>,
52) -> crate::error::Result<()> {
53 store.migrate().await?;
54 store.bootstrap_defaults().await?;
55 let app_state = build_state(
56 store,
57 &config,
58 #[cfg(feature = "storage")]
59 storage,
60 )?;
61 let app = build_router(app_state, &config)?;
62 let listener = TcpListener::bind(&config.bind_addr).await?;
63 info!(bind_addr = %config.bind_addr, "cedros-data HTTP server listening");
64 axum::serve(listener, app).await.map_err(Into::into)
65}
66
67#[cfg(feature = "cedros-login-profile")]
68const CEDROS_LOGIN_CONNECT_TIMEOUT: Duration = Duration::from_secs(2);
69#[cfg(feature = "cedros-login-profile")]
70const CEDROS_LOGIN_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
71
72fn build_state(
73 store: CedrosData,
74 config: &HttpServerConfig,
75 #[cfg(feature = "storage")] storage: Option<crate::storage::StorageClient>,
76) -> crate::error::Result<AppState> {
77 validate_http_auth_configuration(config)?;
78
79 #[cfg(feature = "cedros-login-profile")]
80 {
81 let cedros_auth = if config.enable_cedros_login_profile {
82 let verify_url = config.cedros_login_verify_url.clone().ok_or_else(|| {
83 CedrosDataError::InvalidRequest(
84 "cedros_login_verify_url is required when compatibility profile is enabled"
85 .to_string(),
86 )
87 })?;
88 let permissions_url = derive_permissions_url(&verify_url)?;
89 let client = reqwest::Client::builder()
90 .connect_timeout(CEDROS_LOGIN_CONNECT_TIMEOUT)
91 .timeout(CEDROS_LOGIN_REQUEST_TIMEOUT)
92 .build()
93 .map_err(|error| {
94 CedrosDataError::InvalidRequest(format!(
95 "failed to build cedros-login HTTP client: {error}"
96 ))
97 })?;
98 Some(CedrosLoginState {
99 permissions_url,
100 client,
101 })
102 } else {
103 None
104 };
105
106 Ok(AppState {
107 store,
108 cedros_auth,
109 #[cfg(feature = "storage")]
110 storage,
111 })
112 }
113
114 #[cfg(not(feature = "cedros-login-profile"))]
115 {
116 if config.enable_cedros_login_profile {
117 return Err(CedrosDataError::InvalidRequest(
118 "cedros-login-profile feature is not enabled".to_string(),
119 ));
120 }
121 Ok(AppState {
122 store,
123 #[cfg(feature = "storage")]
124 storage,
125 })
126 }
127}
128
129fn validate_http_auth_configuration(config: &HttpServerConfig) -> crate::error::Result<()> {
130 if config.enable_cedros_login_profile || config.allow_unauthenticated_http {
131 return Ok(());
132 }
133
134 Err(CedrosDataError::InvalidRequest(
135 "embedded HTTP server requires authentication by default; pass --allow-unauthenticated for local development or enable --cedros-login-profile".to_string(),
136 ))
137}
138
139fn build_router(state: AppState, config: &HttpServerConfig) -> crate::error::Result<Router> {
140 #[allow(unused_mut)]
141 let mut app = Router::new()
142 .route("/migrate", post(migrate))
143 .route("/site", post(register_site))
144 .route("/collections", post(register_collection))
145 .route("/custom-schema", post(register_custom_schema))
146 .route("/entries/upsert", post(upsert_entry))
147 .route("/entries/query", post(query_entries))
148 .route("/site/export", get(export_site))
149 .route("/import", post(import_site))
150 .route("/contract/verify", post(verify_contract))
151 .route("/admin/default-pages", get(default_pages))
152 .route("/admin/bootstrap", post(bootstrap_site))
153 .route(
154 "/admin/collections",
155 get(list_site_collections).post(register_site_collection),
156 )
157 .route("/admin/pages", get(list_site_pages))
158 .route("/admin/pages/{page_key}", put(upsert_site_page))
159 .route("/site/config/{key}", get(crate::http_public::public_site_config));
161
162 #[cfg(feature = "storage")]
163 {
164 use crate::http_storage;
165 app = app
166 .route("/admin/storage/config", get(http_storage::storage_config))
167 .route("/admin/storage/test", post(http_storage::storage_test))
168 .route("/admin/media/upload", post(http_storage::media_upload))
169 .route("/admin/media/assets", get(http_storage::list_media_assets))
170 .route(
171 "/admin/media/assets/{asset_id}",
172 get(http_storage::get_media_asset)
173 .put(http_storage::update_media_asset)
174 .delete(http_storage::delete_media_asset),
175 );
176 }
177
178 let mut router = app
179 .layer(TraceLayer::new_for_http())
180 .with_state(Arc::new(state));
181
182 if let Some(cors_layer) = configured_cors_layer(config)? {
183 router = router.layer(cors_layer);
184 }
185
186 Ok(router)
187}
188
189fn configured_cors_layer(config: &HttpServerConfig) -> crate::error::Result<Option<CorsLayer>> {
190 if config.cors_allowed_origins.is_empty() {
191 return Ok(None);
192 }
193
194 let allow_origin = parse_cors_allowed_origins(&config.cors_allowed_origins)?;
195 let cors_layer = CorsLayer::new()
196 .allow_origin(allow_origin)
197 .allow_methods([Method::GET, Method::POST, Method::PUT, Method::OPTIONS])
198 .allow_headers([
199 header::AUTHORIZATION,
200 header::CONTENT_TYPE,
201 header::ACCEPT,
202 HeaderName::from_static("x-cedros-org-id"),
203 ]);
204
205 Ok(Some(cors_layer))
206}
207
208fn parse_cors_allowed_origins(origins: &[String]) -> crate::error::Result<AllowOrigin> {
209 if origins.iter().any(|origin| origin == "*") {
210 if origins.len() > 1 {
211 return Err(CedrosDataError::InvalidRequest(
212 "cors origins cannot mix '*' with explicit origins".to_string(),
213 ));
214 }
215
216 return Ok(AllowOrigin::any());
217 }
218
219 let header_values = origins
220 .iter()
221 .map(|origin| {
222 HeaderValue::from_str(origin).map_err(|error| {
223 CedrosDataError::InvalidRequest(format!(
224 "invalid cors origin \"{origin}\": {error}"
225 ))
226 })
227 })
228 .collect::<crate::error::Result<Vec<_>>>()?;
229
230 Ok(AllowOrigin::list(header_values))
231}
232
233async fn migrate(
234 State(state): State<Arc<AppState>>,
235 headers: HeaderMap,
236) -> Result<Json<serde_json::Value>, AppError> {
237 enforce_org_permission(&state, &headers, CedrosDataPermission::DataAdmin).await?;
238 state.store.migrate().await?;
239 Ok(Json(json!({"ok": true})))
240}
241
242async fn register_site(
243 State(state): State<Arc<AppState>>,
244 headers: HeaderMap,
245 Json(request): Json<RegisterSiteRequest>,
246) -> Result<Json<serde_json::Value>, AppError> {
247 enforce_org_permission(&state, &headers, CedrosDataPermission::DataAdmin).await?;
248 let site = state.store.register_site(request).await?;
249 Ok(Json(json!(site)))
250}
251
252async fn register_collection(
253 State(state): State<Arc<AppState>>,
254 headers: HeaderMap,
255 Json(request): Json<RegisterCollectionRequest>,
256) -> Result<Json<serde_json::Value>, AppError> {
257 enforce_org_permission(&state, &headers, CedrosDataPermission::CollectionsWrite).await?;
258 let collection = state.store.register_collection(request).await?;
259 Ok(Json(json!(collection)))
260}
261
262async fn register_custom_schema(
263 State(state): State<Arc<AppState>>,
264 headers: HeaderMap,
265 Json(request): Json<RegisterCustomSchemaRequest>,
266) -> Result<Json<serde_json::Value>, AppError> {
267 enforce_org_permission(&state, &headers, CedrosDataPermission::SchemaWrite).await?;
268 let report = state.store.register_custom_schema(request).await?;
269 Ok(Json(json!(report)))
270}
271
272async fn upsert_entry(
273 State(state): State<Arc<AppState>>,
274 headers: HeaderMap,
275 Json(request): Json<UpsertEntryRequest>,
276) -> Result<Json<serde_json::Value>, AppError> {
277 enforce_org_permission(
278 &state,
279 &headers,
280 write_permission_for_collection(&request.collection_name),
281 )
282 .await?;
283 let entry = state.store.upsert_entry(request).await?;
284 Ok(Json(json!(entry)))
285}
286
287async fn query_entries(
288 State(state): State<Arc<AppState>>,
289 headers: HeaderMap,
290 Json(request): Json<QueryEntriesRequest>,
291) -> Result<Json<serde_json::Value>, AppError> {
292 enforce_org_permission(
293 &state,
294 &headers,
295 read_permission_for_collection(&request.collection_name),
296 )
297 .await?;
298 let visitor_id = request.visitor_id.clone();
299 let record_reads = request.entry_keys.len() == 1;
300 let mut entries = state.store.query_entries(request).await?;
301
302 if let Some(ref vid) = visitor_id {
303 crate::store::metered_reads::validate_visitor_id(vid)?;
304 state
305 .store
306 .apply_content_gating(&mut entries, vid, record_reads)
307 .await?;
308 }
309
310 Ok(Json(json!(entries)))
311}
312
313async fn export_site(
314 State(state): State<Arc<AppState>>,
315 headers: HeaderMap,
316) -> Result<Json<serde_json::Value>, AppError> {
317 enforce_org_permission(&state, &headers, CedrosDataPermission::OpsWrite).await?;
318 let export = state.store.export_site(ExportSiteRequest).await?;
319 Ok(Json(json!(export)))
320}
321
322async fn import_site(
323 State(state): State<Arc<AppState>>,
324 headers: HeaderMap,
325 Json(request): Json<ImportSiteRequest>,
326) -> Result<Json<serde_json::Value>, AppError> {
327 enforce_org_permission(&state, &headers, CedrosDataPermission::OpsWrite).await?;
328 let result = state.store.import_site(request).await?;
329 Ok(Json(json!(result)))
330}
331
332async fn verify_contract(
333 State(state): State<Arc<AppState>>,
334 headers: HeaderMap,
335 Json(request): Json<VerifyContractRequest>,
336) -> Result<Json<serde_json::Value>, AppError> {
337 enforce_org_permission(&state, &headers, CedrosDataPermission::ContractRead).await?;
338 let report = state.store.verify_contract(request).await?;
339 Ok(Json(json!(report)))
340}
341
342async fn default_pages(
343 State(state): State<Arc<AppState>>,
344 headers: HeaderMap,
345) -> Result<Json<serde_json::Value>, AppError> {
346 enforce_org_permission(&state, &headers, CedrosDataPermission::PagesRead).await?;
347 Ok(Json(json!(crate::defaults::default_page_templates())))
348}
349
350async fn bootstrap_site(
351 State(state): State<Arc<AppState>>,
352 headers: HeaderMap,
353) -> Result<Json<serde_json::Value>, AppError> {
354 enforce_org_permission(&state, &headers, CedrosDataPermission::OpsWrite).await?;
355 let report = state.store.bootstrap_defaults().await?;
356 Ok(Json(json!(report)))
357}
358
359async fn list_site_collections(
360 State(state): State<Arc<AppState>>,
361 headers: HeaderMap,
362) -> Result<Json<serde_json::Value>, AppError> {
363 enforce_org_permission(&state, &headers, CedrosDataPermission::CollectionsRead).await?;
364 let collections = state.store.list_collections().await?;
365 Ok(Json(json!(collections)))
366}
367
368#[derive(Debug, Deserialize)]
369struct RegisterSiteCollectionRequest {
370 collection_name: String,
371 mode: Option<CollectionMode>,
372 table_name: Option<String>,
373 strict_contract: Option<ContractSchema>,
374}
375
376async fn register_site_collection(
377 State(state): State<Arc<AppState>>,
378 headers: HeaderMap,
379 Json(request): Json<RegisterSiteCollectionRequest>,
380) -> Result<Json<serde_json::Value>, AppError> {
381 enforce_org_permission(&state, &headers, CedrosDataPermission::CollectionsWrite).await?;
382 let collection = state
383 .store
384 .register_collection(RegisterCollectionRequest {
385 collection_name: request.collection_name,
386 mode: request.mode.unwrap_or(CollectionMode::Jsonb),
387 table_name: request.table_name,
388 strict_contract: request.strict_contract,
389 })
390 .await?;
391 Ok(Json(json!(collection)))
392}
393
394async fn list_site_pages(
395 State(state): State<Arc<AppState>>,
396 headers: HeaderMap,
397) -> Result<Json<serde_json::Value>, AppError> {
398 enforce_org_permission(&state, &headers, CedrosDataPermission::PagesRead).await?;
399 let pages = state.store.list_default_page_entries().await?;
400 Ok(Json(json!(pages)))
401}
402
403#[derive(Debug, Deserialize)]
404struct UpsertPageRequest {
405 payload: serde_json::Value,
406}
407
408async fn upsert_site_page(
409 Path(page_key): Path<String>,
410 State(state): State<Arc<AppState>>,
411 headers: HeaderMap,
412 Json(request): Json<UpsertPageRequest>,
413) -> Result<Json<serde_json::Value>, AppError> {
414 enforce_org_permission(&state, &headers, CedrosDataPermission::PagesWrite).await?;
415 let page = state
416 .store
417 .upsert_default_page_entry(&page_key, request.payload)
418 .await?;
419 Ok(Json(json!(page)))
420}
421
422#[allow(unused_variables)]
423pub(crate) async fn enforce_org_permission(
424 state: &Arc<AppState>,
425 headers: &HeaderMap,
426 permission: CedrosDataPermission,
427) -> Result<(), AppError> {
428 #[cfg(feature = "cedros-login-profile")]
429 if let Some(auth_state) = state.cedros_auth.as_ref() {
430 enforce_permission_with_cedros_login(auth_state, headers, permission)
431 .await
432 .map_err(map_authz_error)?;
433 }
434 Ok(())
435}
436
437#[cfg(feature = "cedros-login-profile")]
438fn map_authz_error(error: AuthzError) -> AppError {
439 if error.status.is_server_error() {
440 error!(
441 status = %error.status,
442 detail = %error.message,
443 "authorization request failed"
444 );
445 }
446 AppError::new(error.status, error.message)
447}
448
449pub(crate) struct AppError {
450 status: StatusCode,
451 message: String,
452}
453
454impl AppError {
455 pub(crate) fn new(status: StatusCode, message: String) -> Self {
456 Self { status, message }
457 }
458}
459
460impl From<CedrosDataError> for AppError {
461 fn from(error: CedrosDataError) -> Self {
462 match error {
463 CedrosDataError::SiteNotConfigured | CedrosDataError::CollectionNotFound(_) => {
464 Self::new(StatusCode::NOT_FOUND, error.to_string())
465 }
466 CedrosDataError::BreakingSchemaChange(_)
467 | CedrosDataError::ContractVerificationFailed(_)
468 | CedrosDataError::InvalidIdentifier(_)
469 | CedrosDataError::InvalidRequest(_)
470 | CedrosDataError::InvalidTypeExpression(_) => {
471 Self::new(StatusCode::BAD_REQUEST, error.to_string())
472 }
473 CedrosDataError::MissingPostgresUri => {
474 Self::new(StatusCode::BAD_REQUEST, error.to_string())
475 }
476 CedrosDataError::Storage(_) => {
477 error!(detail = %error, "storage request failed");
478 Self::new(
479 StatusCode::INTERNAL_SERVER_ERROR,
480 "storage error".to_string(),
481 )
482 }
483 CedrosDataError::Database(_)
484 | CedrosDataError::Migration(_)
485 | CedrosDataError::Json(_)
486 | CedrosDataError::Io(_) => {
487 error!(detail = %error, "cedros-data request failed");
488 Self::new(
489 StatusCode::INTERNAL_SERVER_ERROR,
490 "internal server error".to_string(),
491 )
492 }
493 }
494 }
495}
496
497impl IntoResponse for AppError {
498 fn into_response(self) -> Response {
499 (self.status, Json(json!({ "error": self.message }))).into_response()
500 }
501}
502
503#[cfg(test)]
504mod tests;