1use super::{Json, Path, RegistryHeader};
2use crate::{
3 datastore::{DataStoreError, RecordStatus},
4 policy::{
5 content::{ContentPolicy, ContentPolicyError},
6 record::{RecordPolicy, RecordPolicyError},
7 },
8 services::CoreService,
9};
10use axum::{
11 body::{Body, BodyDataStream},
12 debug_handler,
13 extract::State,
14 http::StatusCode,
15 response::IntoResponse,
16 routing::{get, post},
17 Router,
18};
19use futures::StreamExt;
20use indexmap::IndexMap;
21use std::path::PathBuf;
22use std::sync::Arc;
23use tempfile::NamedTempFile;
24use tokio::io::AsyncWriteExt;
25use warg_api::v1::package::{
26 MissingContent, PackageError, PackageRecord, PackageRecordState, PublishRecordRequest,
27 UploadEndpoint,
28};
29use warg_crypto::hash::{AnyHash, Sha256};
30use warg_protocol::{
31 package,
32 registry::{LogId, RecordId},
33 ProtoEnvelope, Record as _,
34};
35
36#[derive(Clone)]
37pub struct Config {
38 core_service: CoreService,
39 files_dir: PathBuf,
40 temp_dir: PathBuf,
41 content_policy: Option<Arc<dyn ContentPolicy>>,
42 record_policy: Option<Arc<dyn RecordPolicy>>,
43}
44
45impl Config {
46 pub fn new(
47 core_service: CoreService,
48 files_dir: PathBuf,
49 temp_dir: PathBuf,
50 content_policy: Option<Arc<dyn ContentPolicy>>,
51 record_policy: Option<Arc<dyn RecordPolicy>>,
52 ) -> Self {
53 Self {
54 core_service,
55 files_dir,
56 temp_dir,
57 content_policy,
58 record_policy,
59 }
60 }
61
62 pub fn into_router(self) -> Router {
63 Router::new()
64 .route("/:log_id/record", post(publish_record))
65 .route("/:log_id/record/:record_id", get(get_record))
66 .route(
67 "/:log_id/record/:record_id/content/:digest",
68 post(upload_content),
69 )
70 .with_state(self)
71 }
72
73 fn content_present(&self, digest: &AnyHash) -> bool {
74 self.content_path(digest).is_file()
75 }
76
77 fn content_file_name(&self, digest: &AnyHash) -> String {
78 digest.to_string().replace(':', "-")
79 }
80
81 fn content_path(&self, digest: &AnyHash) -> PathBuf {
82 self.files_dir.join(self.content_file_name(digest))
83 }
84
85 fn build_missing_content<'a>(
86 &self,
87 log_id: &LogId,
88 record_id: &RecordId,
89 missing_digests: impl IntoIterator<Item = &'a AnyHash>,
90 ) -> IndexMap<AnyHash, MissingContent> {
91 missing_digests
92 .into_iter()
93 .map(|digest| {
94 let url = format!("v1/package/{log_id}/record/{record_id}/content/{digest}");
95 (
96 digest.clone(),
97 MissingContent {
98 upload: vec![UploadEndpoint::Http {
99 method: "POST".to_string(),
100 url,
101 headers: IndexMap::new(),
102 }],
103 },
104 )
105 })
106 .collect()
107 }
108}
109
110struct PackageApiError(PackageError);
111
112impl PackageApiError {
113 fn bad_request(message: impl ToString) -> Self {
114 Self(PackageError::Message {
115 status: StatusCode::BAD_REQUEST.as_u16(),
116 message: message.to_string(),
117 })
118 }
119
120 fn internal_error(e: impl std::fmt::Display) -> Self {
121 tracing::error!("unexpected error: {e}");
122 Self(PackageError::Message {
123 status: StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
124 message: "an error occurred while processing the request".into(),
125 })
126 }
127
128 fn unsupported(message: impl ToString) -> Self {
129 Self(PackageError::Message {
130 status: StatusCode::NOT_IMPLEMENTED.as_u16(),
131 message: message.to_string(),
132 })
133 }
134}
135
136impl From<DataStoreError> for PackageApiError {
137 fn from(e: DataStoreError) -> Self {
138 Self(match e {
139 DataStoreError::PackageValidationFailed(e) => {
140 return Self::bad_request(e);
141 }
142 DataStoreError::LogNotFound(id) => PackageError::LogNotFound(id),
143 DataStoreError::RecordNotFound(id) => PackageError::RecordNotFound(id),
144 DataStoreError::UnknownKey(_) | DataStoreError::SignatureVerificationFailed(_) => {
145 PackageError::Unauthorized(e.to_string())
146 }
147 DataStoreError::PackageNamespaceNotDefined(id) => PackageError::NamespaceNotDefined(id),
148 DataStoreError::PackageNamespaceImported(id) => PackageError::NamespaceImported(id),
149 e => {
151 tracing::error!("unexpected data store error: {e}");
152 PackageError::Message {
153 status: StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
154 message: "an error occurred while processing the request".into(),
155 }
156 }
157 })
158 }
159}
160
161impl From<ContentPolicyError> for PackageApiError {
162 fn from(e: ContentPolicyError) -> Self {
163 match e {
164 ContentPolicyError::Rejection(message) => Self(PackageError::Rejection(message)),
165 }
166 }
167}
168
169impl From<RecordPolicyError> for PackageApiError {
170 fn from(e: RecordPolicyError) -> Self {
171 match e {
172 RecordPolicyError::Unauthorized(message) => Self(PackageError::Unauthorized(message)),
173 RecordPolicyError::Rejection(message) => Self(PackageError::Rejection(message)),
174 }
175 }
176}
177
178impl IntoResponse for PackageApiError {
179 fn into_response(self) -> axum::response::Response {
180 (StatusCode::from_u16(self.0.status()).unwrap(), Json(self.0)).into_response()
181 }
182}
183
184#[debug_handler]
185async fn publish_record(
186 State(config): State<Config>,
187 Path(log_id): Path<LogId>,
188 RegistryHeader(_registry_header): RegistryHeader,
189 Json(body): Json<PublishRecordRequest<'static>>,
190) -> Result<impl IntoResponse, PackageApiError> {
191 let expected_log_id = LogId::package_log::<Sha256>(&body.package_name);
192 if expected_log_id != log_id {
193 return Err(PackageApiError::bad_request(format!(
194 "package log identifier `{expected_log_id}` derived from `{name}` does not match provided log identifier `{log_id}`",
195 name = body.package_name
196 )));
197 }
198
199 let record: ProtoEnvelope<package::PackageRecord> = body
200 .record
201 .into_owned()
202 .try_into()
203 .map_err(PackageApiError::bad_request)?;
204
205 if !body.content_sources.is_empty() {
207 return Err(PackageApiError::unsupported(
208 "specifying content sources is not supported",
209 ));
210 }
211
212 config
216 .core_service
217 .store()
218 .verify_can_publish_package(&LogId::operator_log::<Sha256>(), &body.package_name)
219 .await?;
220
221 if let Some(policy) = &config.record_policy {
224 policy.check(&body.package_name, &record)?;
225 }
226
227 config
229 .core_service
230 .store()
231 .verify_package_record_signature(&log_id, &record)
232 .await?;
233
234 let record_id = RecordId::package_record::<Sha256>(&record);
235 let mut missing = record.as_ref().contents();
236 missing.retain(|d| !config.content_present(d));
237
238 config
239 .core_service
240 .store()
241 .store_package_record(&log_id, &body.package_name, &record_id, &record, &missing)
242 .await?;
243
244 if missing.is_empty() {
246 config
247 .core_service
248 .submit_package_record(log_id, record_id.clone())
249 .await;
250
251 return Ok((
252 StatusCode::ACCEPTED,
253 Json(PackageRecord {
254 record_id,
255 state: PackageRecordState::Processing,
256 }),
257 ));
258 }
259
260 let missing_content = config.build_missing_content(&log_id, &record_id, missing);
261 Ok((
262 StatusCode::ACCEPTED,
263 Json(PackageRecord {
264 record_id,
265 state: PackageRecordState::Sourcing { missing_content },
266 }),
267 ))
268}
269
270#[debug_handler]
271async fn get_record(
272 State(config): State<Config>,
273 Path((log_id, record_id)): Path<(LogId, RecordId)>,
274 RegistryHeader(_registry_header): RegistryHeader,
275) -> Result<Json<PackageRecord>, PackageApiError> {
276 let record = config
277 .core_service
278 .store()
279 .get_package_record(&log_id, &record_id)
280 .await?;
281
282 match record.status {
283 RecordStatus::MissingContent(missing) => {
284 let missing_content = config.build_missing_content(&log_id, &record_id, &missing);
285 Ok(Json(PackageRecord {
286 record_id,
287 state: PackageRecordState::Sourcing { missing_content },
288 }))
289 }
290 RecordStatus::Pending | RecordStatus::Validated => Ok(Json(PackageRecord {
292 record_id,
293 state: PackageRecordState::Processing,
294 })),
295 RecordStatus::Rejected(reason) => Ok(Json(PackageRecord {
296 record_id,
297 state: PackageRecordState::Rejected { reason },
298 })),
299 RecordStatus::Published => {
300 let registry_index = record.registry_index.unwrap();
301
302 Ok(Json(PackageRecord {
303 record_id,
304 state: PackageRecordState::Published { registry_index },
305 }))
306 }
307 }
308}
309
310#[debug_handler]
311async fn upload_content(
312 State(config): State<Config>,
313 Path((log_id, record_id, digest)): Path<(LogId, RecordId, AnyHash)>,
314 RegistryHeader(_registry_header): RegistryHeader,
315 body: Body,
316) -> Result<impl IntoResponse, PackageApiError> {
317 match config
318 .core_service
319 .store()
320 .is_content_missing(&log_id, &record_id, &digest)
321 .await
322 {
323 Ok(true) => {}
324 Ok(false) => {
325 return Err(PackageApiError::bad_request(
326 "content digest `{digest}` is not required for package record `{record_id}`",
327 ));
328 }
329 Err(DataStoreError::RecordNotPending(_)) => {
330 return Err(PackageApiError(PackageError::RecordNotSourcing))
331 }
332 Err(e) => return Err(e.into()),
333 }
334
335 let tmp_path = NamedTempFile::new_in(&config.temp_dir)
336 .map_err(PackageApiError::internal_error)?
337 .into_temp_path();
338
339 tracing::debug!(
340 "uploading content for record `{record_id}` from `{log_id}` to `{path}`",
341 path = tmp_path.display()
342 );
343
344 let res = process_content(
345 &tmp_path,
346 &digest,
347 body.into_data_stream(),
348 config.content_policy.as_deref(),
349 )
350 .await;
351
352 if let Err(PackageApiError(PackageError::Rejection(reason))) = &res {
354 config
355 .core_service
356 .store()
357 .reject_package_record(
358 &log_id,
359 &record_id,
360 &format!("content with digest `{digest}` was rejected by policy: {reason}"),
361 )
362 .await?;
363 }
364
365 res?;
367
368 tmp_path
369 .persist(config.content_path(&digest))
370 .map_err(PackageApiError::internal_error)?;
371
372 if config
374 .core_service
375 .store()
376 .set_content_present(&log_id, &record_id, &digest)
377 .await?
378 {
379 config
380 .core_service
381 .submit_package_record(log_id, record_id.clone())
382 .await;
383 }
384
385 Ok(StatusCode::CREATED)
386}
387
388async fn process_content(
389 path: &std::path::Path,
390 digest: &AnyHash,
391 mut stream: BodyDataStream,
392 policy: Option<&dyn ContentPolicy>,
393) -> Result<(), PackageApiError> {
394 let mut tmp_file = tokio::fs::File::create(&path)
395 .await
396 .map_err(PackageApiError::internal_error)?;
397
398 let mut hasher = digest.algorithm().hasher();
399 let mut policy = policy.map(|p| p.new_stream_policy(digest)).transpose()?;
400
401 while let Some(chunk) = stream
402 .next()
403 .await
404 .transpose()
405 .map_err(PackageApiError::internal_error)?
406 {
407 if let Some(policy) = policy.as_mut() {
408 policy.check(&chunk)?;
409 }
410
411 hasher.update(&chunk);
412 tmp_file
413 .write_all(&chunk)
414 .await
415 .map_err(PackageApiError::internal_error)?;
416 }
417
418 let result = hasher.finalize();
419 if &result != digest {
420 return Err(PackageApiError::bad_request(format!(
421 "content digest `{result}` does not match expected digest `{digest}`",
422 )));
423 }
424
425 if let Some(mut policy) = policy {
426 policy.finalize()?;
427 }
428
429 Ok(())
430}