warg_server/api/v1/
package.rs

1use super::{Json, Path, RegistryHeader};
2use crate::{
3    datastore::{DataStoreError, RecordStatus},
4    policy::{
5        content::{ContentPolicy, ContentPolicyError},
6        record::{RecordPolicy, RecordPolicyError},
7    },
8    services::CoreService,
9};
10use axum::{
11    body::{Body, BodyDataStream},
12    debug_handler,
13    extract::State,
14    http::StatusCode,
15    response::IntoResponse,
16    routing::{get, post},
17    Router,
18};
19use futures::StreamExt;
20use indexmap::IndexMap;
21use std::path::PathBuf;
22use std::sync::Arc;
23use tempfile::NamedTempFile;
24use tokio::io::AsyncWriteExt;
25use warg_api::v1::package::{
26    MissingContent, PackageError, PackageRecord, PackageRecordState, PublishRecordRequest,
27    UploadEndpoint,
28};
29use warg_crypto::hash::{AnyHash, Sha256};
30use warg_protocol::{
31    package,
32    registry::{LogId, RecordId},
33    ProtoEnvelope, Record as _,
34};
35
36#[derive(Clone)]
37pub struct Config {
38    core_service: CoreService,
39    files_dir: PathBuf,
40    temp_dir: PathBuf,
41    content_policy: Option<Arc<dyn ContentPolicy>>,
42    record_policy: Option<Arc<dyn RecordPolicy>>,
43}
44
45impl Config {
46    pub fn new(
47        core_service: CoreService,
48        files_dir: PathBuf,
49        temp_dir: PathBuf,
50        content_policy: Option<Arc<dyn ContentPolicy>>,
51        record_policy: Option<Arc<dyn RecordPolicy>>,
52    ) -> Self {
53        Self {
54            core_service,
55            files_dir,
56            temp_dir,
57            content_policy,
58            record_policy,
59        }
60    }
61
62    pub fn into_router(self) -> Router {
63        Router::new()
64            .route("/:log_id/record", post(publish_record))
65            .route("/:log_id/record/:record_id", get(get_record))
66            .route(
67                "/:log_id/record/:record_id/content/:digest",
68                post(upload_content),
69            )
70            .with_state(self)
71    }
72
73    fn content_present(&self, digest: &AnyHash) -> bool {
74        self.content_path(digest).is_file()
75    }
76
77    fn content_file_name(&self, digest: &AnyHash) -> String {
78        digest.to_string().replace(':', "-")
79    }
80
81    fn content_path(&self, digest: &AnyHash) -> PathBuf {
82        self.files_dir.join(self.content_file_name(digest))
83    }
84
85    fn build_missing_content<'a>(
86        &self,
87        log_id: &LogId,
88        record_id: &RecordId,
89        missing_digests: impl IntoIterator<Item = &'a AnyHash>,
90    ) -> IndexMap<AnyHash, MissingContent> {
91        missing_digests
92            .into_iter()
93            .map(|digest| {
94                let url = format!("v1/package/{log_id}/record/{record_id}/content/{digest}");
95                (
96                    digest.clone(),
97                    MissingContent {
98                        upload: vec![UploadEndpoint::Http {
99                            method: "POST".to_string(),
100                            url,
101                            headers: IndexMap::new(),
102                        }],
103                    },
104                )
105            })
106            .collect()
107    }
108}
109
110struct PackageApiError(PackageError);
111
112impl PackageApiError {
113    fn bad_request(message: impl ToString) -> Self {
114        Self(PackageError::Message {
115            status: StatusCode::BAD_REQUEST.as_u16(),
116            message: message.to_string(),
117        })
118    }
119
120    fn internal_error(e: impl std::fmt::Display) -> Self {
121        tracing::error!("unexpected error: {e}");
122        Self(PackageError::Message {
123            status: StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
124            message: "an error occurred while processing the request".into(),
125        })
126    }
127
128    fn unsupported(message: impl ToString) -> Self {
129        Self(PackageError::Message {
130            status: StatusCode::NOT_IMPLEMENTED.as_u16(),
131            message: message.to_string(),
132        })
133    }
134}
135
136impl From<DataStoreError> for PackageApiError {
137    fn from(e: DataStoreError) -> Self {
138        Self(match e {
139            DataStoreError::PackageValidationFailed(e) => {
140                return Self::bad_request(e);
141            }
142            DataStoreError::LogNotFound(id) => PackageError::LogNotFound(id),
143            DataStoreError::RecordNotFound(id) => PackageError::RecordNotFound(id),
144            DataStoreError::UnknownKey(_) | DataStoreError::SignatureVerificationFailed(_) => {
145                PackageError::Unauthorized(e.to_string())
146            }
147            DataStoreError::PackageNamespaceNotDefined(id) => PackageError::NamespaceNotDefined(id),
148            DataStoreError::PackageNamespaceImported(id) => PackageError::NamespaceImported(id),
149            // Other errors are internal server errors
150            e => {
151                tracing::error!("unexpected data store error: {e}");
152                PackageError::Message {
153                    status: StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
154                    message: "an error occurred while processing the request".into(),
155                }
156            }
157        })
158    }
159}
160
161impl From<ContentPolicyError> for PackageApiError {
162    fn from(e: ContentPolicyError) -> Self {
163        match e {
164            ContentPolicyError::Rejection(message) => Self(PackageError::Rejection(message)),
165        }
166    }
167}
168
169impl From<RecordPolicyError> for PackageApiError {
170    fn from(e: RecordPolicyError) -> Self {
171        match e {
172            RecordPolicyError::Unauthorized(message) => Self(PackageError::Unauthorized(message)),
173            RecordPolicyError::Rejection(message) => Self(PackageError::Rejection(message)),
174        }
175    }
176}
177
178impl IntoResponse for PackageApiError {
179    fn into_response(self) -> axum::response::Response {
180        (StatusCode::from_u16(self.0.status()).unwrap(), Json(self.0)).into_response()
181    }
182}
183
184#[debug_handler]
185async fn publish_record(
186    State(config): State<Config>,
187    Path(log_id): Path<LogId>,
188    RegistryHeader(_registry_header): RegistryHeader,
189    Json(body): Json<PublishRecordRequest<'static>>,
190) -> Result<impl IntoResponse, PackageApiError> {
191    let expected_log_id = LogId::package_log::<Sha256>(&body.package_name);
192    if expected_log_id != log_id {
193        return Err(PackageApiError::bad_request(format!(
194            "package log identifier `{expected_log_id}` derived from `{name}` does not match provided log identifier `{log_id}`",
195            name = body.package_name
196        )));
197    }
198
199    let record: ProtoEnvelope<package::PackageRecord> = body
200        .record
201        .into_owned()
202        .try_into()
203        .map_err(PackageApiError::bad_request)?;
204
205    // Specifying content sources is not allowed in this implementation
206    if !body.content_sources.is_empty() {
207        return Err(PackageApiError::unsupported(
208            "specifying content sources is not supported",
209        ));
210    }
211
212    // Verify the package name is unique in a case insensitive way and
213    // the namespace is defined in the operator log and not imported
214    // from another registry.
215    config
216        .core_service
217        .store()
218        .verify_can_publish_package(&LogId::operator_log::<Sha256>(), &body.package_name)
219        .await?;
220
221    // Preemptively perform the policy check on the record before storing it
222    // This is performed here so that we never store an unauthorized record
223    if let Some(policy) = &config.record_policy {
224        policy.check(&body.package_name, &record)?;
225    }
226
227    // Verify the signature on the record itself before storing it
228    config
229        .core_service
230        .store()
231        .verify_package_record_signature(&log_id, &record)
232        .await?;
233
234    let record_id = RecordId::package_record::<Sha256>(&record);
235    let mut missing = record.as_ref().contents();
236    missing.retain(|d| !config.content_present(d));
237
238    config
239        .core_service
240        .store()
241        .store_package_record(&log_id, &body.package_name, &record_id, &record, &missing)
242        .await?;
243
244    // If there's no missing content, submit the record for processing now
245    if missing.is_empty() {
246        config
247            .core_service
248            .submit_package_record(log_id, record_id.clone())
249            .await;
250
251        return Ok((
252            StatusCode::ACCEPTED,
253            Json(PackageRecord {
254                record_id,
255                state: PackageRecordState::Processing,
256            }),
257        ));
258    }
259
260    let missing_content = config.build_missing_content(&log_id, &record_id, missing);
261    Ok((
262        StatusCode::ACCEPTED,
263        Json(PackageRecord {
264            record_id,
265            state: PackageRecordState::Sourcing { missing_content },
266        }),
267    ))
268}
269
270#[debug_handler]
271async fn get_record(
272    State(config): State<Config>,
273    Path((log_id, record_id)): Path<(LogId, RecordId)>,
274    RegistryHeader(_registry_header): RegistryHeader,
275) -> Result<Json<PackageRecord>, PackageApiError> {
276    let record = config
277        .core_service
278        .store()
279        .get_package_record(&log_id, &record_id)
280        .await?;
281
282    match record.status {
283        RecordStatus::MissingContent(missing) => {
284            let missing_content = config.build_missing_content(&log_id, &record_id, &missing);
285            Ok(Json(PackageRecord {
286                record_id,
287                state: PackageRecordState::Sourcing { missing_content },
288            }))
289        }
290        // Validated is considered still processing until included in a checkpoint
291        RecordStatus::Pending | RecordStatus::Validated => Ok(Json(PackageRecord {
292            record_id,
293            state: PackageRecordState::Processing,
294        })),
295        RecordStatus::Rejected(reason) => Ok(Json(PackageRecord {
296            record_id,
297            state: PackageRecordState::Rejected { reason },
298        })),
299        RecordStatus::Published => {
300            let registry_index = record.registry_index.unwrap();
301
302            Ok(Json(PackageRecord {
303                record_id,
304                state: PackageRecordState::Published { registry_index },
305            }))
306        }
307    }
308}
309
310#[debug_handler]
311async fn upload_content(
312    State(config): State<Config>,
313    Path((log_id, record_id, digest)): Path<(LogId, RecordId, AnyHash)>,
314    RegistryHeader(_registry_header): RegistryHeader,
315    body: Body,
316) -> Result<impl IntoResponse, PackageApiError> {
317    match config
318        .core_service
319        .store()
320        .is_content_missing(&log_id, &record_id, &digest)
321        .await
322    {
323        Ok(true) => {}
324        Ok(false) => {
325            return Err(PackageApiError::bad_request(
326                "content digest `{digest}` is not required for package record `{record_id}`",
327            ));
328        }
329        Err(DataStoreError::RecordNotPending(_)) => {
330            return Err(PackageApiError(PackageError::RecordNotSourcing))
331        }
332        Err(e) => return Err(e.into()),
333    }
334
335    let tmp_path = NamedTempFile::new_in(&config.temp_dir)
336        .map_err(PackageApiError::internal_error)?
337        .into_temp_path();
338
339    tracing::debug!(
340        "uploading content for record `{record_id}` from `{log_id}` to `{path}`",
341        path = tmp_path.display()
342    );
343
344    let res = process_content(
345        &tmp_path,
346        &digest,
347        body.into_data_stream(),
348        config.content_policy.as_deref(),
349    )
350    .await;
351
352    // If the error was a rejection, transition the record itself to rejected
353    if let Err(PackageApiError(PackageError::Rejection(reason))) = &res {
354        config
355            .core_service
356            .store()
357            .reject_package_record(
358                &log_id,
359                &record_id,
360                &format!("content with digest `{digest}` was rejected by policy: {reason}"),
361            )
362            .await?;
363    }
364
365    // Only persist the file if the content was successfully processed
366    res?;
367
368    tmp_path
369        .persist(config.content_path(&digest))
370        .map_err(PackageApiError::internal_error)?;
371
372    // If this is the last content needed, submit the record for processing now
373    if config
374        .core_service
375        .store()
376        .set_content_present(&log_id, &record_id, &digest)
377        .await?
378    {
379        config
380            .core_service
381            .submit_package_record(log_id, record_id.clone())
382            .await;
383    }
384
385    Ok(StatusCode::CREATED)
386}
387
388async fn process_content(
389    path: &std::path::Path,
390    digest: &AnyHash,
391    mut stream: BodyDataStream,
392    policy: Option<&dyn ContentPolicy>,
393) -> Result<(), PackageApiError> {
394    let mut tmp_file = tokio::fs::File::create(&path)
395        .await
396        .map_err(PackageApiError::internal_error)?;
397
398    let mut hasher = digest.algorithm().hasher();
399    let mut policy = policy.map(|p| p.new_stream_policy(digest)).transpose()?;
400
401    while let Some(chunk) = stream
402        .next()
403        .await
404        .transpose()
405        .map_err(PackageApiError::internal_error)?
406    {
407        if let Some(policy) = policy.as_mut() {
408            policy.check(&chunk)?;
409        }
410
411        hasher.update(&chunk);
412        tmp_file
413            .write_all(&chunk)
414            .await
415            .map_err(PackageApiError::internal_error)?;
416    }
417
418    let result = hasher.finalize();
419    if &result != digest {
420        return Err(PackageApiError::bad_request(format!(
421            "content digest `{result}` does not match expected digest `{digest}`",
422        )));
423    }
424
425    if let Some(mut policy) = policy {
426        policy.finalize()?;
427    }
428
429    Ok(())
430}