warg_server/api/v1/
fetch.rs

1use super::{Json, RegistryHeader};
2use crate::datastore::DataStoreError;
3use crate::services::CoreService;
4use axum::http::StatusCode;
5use axum::{
6    debug_handler,
7    extract::State,
8    response::IntoResponse,
9    routing::{get, post},
10    Router,
11};
12use indexmap::IndexMap;
13use warg_api::v1::fetch::{
14    FetchError, FetchLogsRequest, FetchLogsResponse, FetchPackageNamesRequest,
15    FetchPackageNamesResponse, PublishedRecord,
16};
17use warg_crypto::hash::{AnyHash, Sha256};
18use warg_protocol::registry::{LogId, RecordId, TimestampedCheckpoint};
19use warg_protocol::SerdeEnvelope;
20
21const DEFAULT_RECORDS_LIMIT: u16 = 100;
22const MAX_RECORDS_LIMIT: u16 = 1000;
23
24const MAX_PACKAGE_NAMES_LIMIT: usize = 1000;
25
26#[derive(Clone)]
27pub struct Config {
28    core_service: CoreService,
29}
30
31impl Config {
32    pub fn new(core_service: CoreService) -> Self {
33        Self { core_service }
34    }
35
36    pub fn into_router(self) -> Router {
37        Router::new()
38            .route("/checkpoint", get(fetch_checkpoint))
39            .route("/logs", post(fetch_logs))
40            .route("/names", post(fetch_package_names))
41            .with_state(self)
42    }
43}
44
45struct FetchApiError(FetchError);
46
47impl FetchApiError {
48    fn bad_request(message: impl ToString) -> Self {
49        Self(FetchError::Message {
50            status: StatusCode::BAD_REQUEST.as_u16(),
51            message: message.to_string(),
52        })
53    }
54}
55
56impl From<DataStoreError> for FetchApiError {
57    fn from(e: DataStoreError) -> Self {
58        Self(match e {
59            DataStoreError::CheckpointNotFound(checkpoint) => {
60                FetchError::CheckpointNotFound(checkpoint)
61            }
62            DataStoreError::LogNotFound(log_id) => FetchError::LogNotFound(log_id),
63            DataStoreError::RecordNotFound(record_id) => {
64                FetchError::FetchTokenNotFound(record_id.to_string())
65            }
66            // Other errors are internal server errors
67            e => {
68                tracing::error!("unexpected data store error: {e}");
69                FetchError::Message {
70                    status: StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
71                    message: "an error occurred while processing the request".into(),
72                }
73            }
74        })
75    }
76}
77
78impl IntoResponse for FetchApiError {
79    fn into_response(self) -> axum::response::Response {
80        (StatusCode::from_u16(self.0.status()).unwrap(), Json(self.0)).into_response()
81    }
82}
83
84#[debug_handler]
85async fn fetch_logs(
86    State(config): State<Config>,
87    RegistryHeader(_registry_header): RegistryHeader,
88    Json(body): Json<FetchLogsRequest<'static>>,
89) -> Result<Json<FetchLogsResponse>, FetchApiError> {
90    let limit = body.limit.unwrap_or(DEFAULT_RECORDS_LIMIT);
91    if limit == 0 || limit > MAX_RECORDS_LIMIT {
92        return Err(FetchApiError::bad_request(format!(
93            "invalid records limit value `{limit}`: must be between 1 and {MAX_RECORDS_LIMIT}"
94        )));
95    }
96
97    let operator_fetch_token: Option<RecordId> = match body.operator {
98        Some(s) => Some(
99            s.parse::<AnyHash>()
100                .map_err(|_| FetchApiError(FetchError::FetchTokenNotFound(s.into_owned())))?
101                .into(),
102        ),
103        None => None,
104    };
105    let operator: Vec<PublishedRecord> = config
106        .core_service
107        .store()
108        .get_operator_records(
109            &LogId::operator_log::<Sha256>(),
110            body.log_length,
111            operator_fetch_token.as_ref(),
112            limit,
113        )
114        .await?
115        .into_iter()
116        .map(|envelope| {
117            // use the record ID as the fetch token
118            let fetch_token = RecordId::operator_record::<Sha256>(&envelope.envelope).to_string();
119            PublishedRecord {
120                envelope: envelope.into(),
121                fetch_token,
122            }
123        })
124        .collect();
125
126    let mut more = operator.len() == limit as usize;
127
128    let mut map = IndexMap::new();
129    let packages = body.packages.into_owned();
130    for (id, fetch_token) in packages {
131        let since: Option<RecordId> = match fetch_token {
132            Some(s) => Some(
133                s.parse::<AnyHash>()
134                    .map_err(|_| FetchApiError(FetchError::FetchTokenNotFound(s)))?
135                    .into(),
136            ),
137            None => None,
138        };
139        let records: Vec<PublishedRecord> = config
140            .core_service
141            .store()
142            .get_package_records(&id, body.log_length, since.as_ref(), limit)
143            .await?
144            .into_iter()
145            .map(|envelope| {
146                // use the record ID as the fetch token
147                let fetch_token =
148                    RecordId::package_record::<Sha256>(&envelope.envelope).to_string();
149                PublishedRecord {
150                    envelope: envelope.into(),
151                    fetch_token,
152                }
153            })
154            .collect();
155        more |= records.len() == limit as usize;
156        map.insert(id, records);
157    }
158
159    Ok(Json(FetchLogsResponse {
160        more,
161        operator,
162        packages: map,
163        warnings: Vec::default(),
164    }))
165}
166
167#[debug_handler]
168async fn fetch_checkpoint(
169    State(config): State<Config>,
170    RegistryHeader(_registry_header): RegistryHeader,
171) -> Result<Json<SerdeEnvelope<TimestampedCheckpoint>>, FetchApiError> {
172    Ok(Json(
173        config.core_service.store().get_latest_checkpoint().await?,
174    ))
175}
176
177#[debug_handler]
178async fn fetch_package_names(
179    State(config): State<Config>,
180    RegistryHeader(_registry_header): RegistryHeader,
181    Json(body): Json<FetchPackageNamesRequest<'static>>,
182) -> Result<Json<FetchPackageNamesResponse>, FetchApiError> {
183    let log_ids = if body.packages.len() > MAX_PACKAGE_NAMES_LIMIT {
184        body.packages.get(..MAX_PACKAGE_NAMES_LIMIT).unwrap()
185    } else {
186        &body.packages
187    };
188
189    let packages = config
190        .core_service
191        .store()
192        .get_package_names(log_ids)
193        .await?;
194
195    Ok(Json(FetchPackageNamesResponse { packages }))
196}