warg_server/api/v1/
fetch.rs1use super::{Json, RegistryHeader};
2use crate::datastore::DataStoreError;
3use crate::services::CoreService;
4use axum::http::StatusCode;
5use axum::{
6 debug_handler,
7 extract::State,
8 response::IntoResponse,
9 routing::{get, post},
10 Router,
11};
12use indexmap::IndexMap;
13use warg_api::v1::fetch::{
14 FetchError, FetchLogsRequest, FetchLogsResponse, FetchPackageNamesRequest,
15 FetchPackageNamesResponse, PublishedRecord,
16};
17use warg_crypto::hash::{AnyHash, Sha256};
18use warg_protocol::registry::{LogId, RecordId, TimestampedCheckpoint};
19use warg_protocol::SerdeEnvelope;
20
21const DEFAULT_RECORDS_LIMIT: u16 = 100;
22const MAX_RECORDS_LIMIT: u16 = 1000;
23
24const MAX_PACKAGE_NAMES_LIMIT: usize = 1000;
25
26#[derive(Clone)]
27pub struct Config {
28 core_service: CoreService,
29}
30
31impl Config {
32 pub fn new(core_service: CoreService) -> Self {
33 Self { core_service }
34 }
35
36 pub fn into_router(self) -> Router {
37 Router::new()
38 .route("/checkpoint", get(fetch_checkpoint))
39 .route("/logs", post(fetch_logs))
40 .route("/names", post(fetch_package_names))
41 .with_state(self)
42 }
43}
44
45struct FetchApiError(FetchError);
46
47impl FetchApiError {
48 fn bad_request(message: impl ToString) -> Self {
49 Self(FetchError::Message {
50 status: StatusCode::BAD_REQUEST.as_u16(),
51 message: message.to_string(),
52 })
53 }
54}
55
56impl From<DataStoreError> for FetchApiError {
57 fn from(e: DataStoreError) -> Self {
58 Self(match e {
59 DataStoreError::CheckpointNotFound(checkpoint) => {
60 FetchError::CheckpointNotFound(checkpoint)
61 }
62 DataStoreError::LogNotFound(log_id) => FetchError::LogNotFound(log_id),
63 DataStoreError::RecordNotFound(record_id) => {
64 FetchError::FetchTokenNotFound(record_id.to_string())
65 }
66 e => {
68 tracing::error!("unexpected data store error: {e}");
69 FetchError::Message {
70 status: StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
71 message: "an error occurred while processing the request".into(),
72 }
73 }
74 })
75 }
76}
77
78impl IntoResponse for FetchApiError {
79 fn into_response(self) -> axum::response::Response {
80 (StatusCode::from_u16(self.0.status()).unwrap(), Json(self.0)).into_response()
81 }
82}
83
84#[debug_handler]
85async fn fetch_logs(
86 State(config): State<Config>,
87 RegistryHeader(_registry_header): RegistryHeader,
88 Json(body): Json<FetchLogsRequest<'static>>,
89) -> Result<Json<FetchLogsResponse>, FetchApiError> {
90 let limit = body.limit.unwrap_or(DEFAULT_RECORDS_LIMIT);
91 if limit == 0 || limit > MAX_RECORDS_LIMIT {
92 return Err(FetchApiError::bad_request(format!(
93 "invalid records limit value `{limit}`: must be between 1 and {MAX_RECORDS_LIMIT}"
94 )));
95 }
96
97 let operator_fetch_token: Option<RecordId> = match body.operator {
98 Some(s) => Some(
99 s.parse::<AnyHash>()
100 .map_err(|_| FetchApiError(FetchError::FetchTokenNotFound(s.into_owned())))?
101 .into(),
102 ),
103 None => None,
104 };
105 let operator: Vec<PublishedRecord> = config
106 .core_service
107 .store()
108 .get_operator_records(
109 &LogId::operator_log::<Sha256>(),
110 body.log_length,
111 operator_fetch_token.as_ref(),
112 limit,
113 )
114 .await?
115 .into_iter()
116 .map(|envelope| {
117 let fetch_token = RecordId::operator_record::<Sha256>(&envelope.envelope).to_string();
119 PublishedRecord {
120 envelope: envelope.into(),
121 fetch_token,
122 }
123 })
124 .collect();
125
126 let mut more = operator.len() == limit as usize;
127
128 let mut map = IndexMap::new();
129 let packages = body.packages.into_owned();
130 for (id, fetch_token) in packages {
131 let since: Option<RecordId> = match fetch_token {
132 Some(s) => Some(
133 s.parse::<AnyHash>()
134 .map_err(|_| FetchApiError(FetchError::FetchTokenNotFound(s)))?
135 .into(),
136 ),
137 None => None,
138 };
139 let records: Vec<PublishedRecord> = config
140 .core_service
141 .store()
142 .get_package_records(&id, body.log_length, since.as_ref(), limit)
143 .await?
144 .into_iter()
145 .map(|envelope| {
146 let fetch_token =
148 RecordId::package_record::<Sha256>(&envelope.envelope).to_string();
149 PublishedRecord {
150 envelope: envelope.into(),
151 fetch_token,
152 }
153 })
154 .collect();
155 more |= records.len() == limit as usize;
156 map.insert(id, records);
157 }
158
159 Ok(Json(FetchLogsResponse {
160 more,
161 operator,
162 packages: map,
163 warnings: Vec::default(),
164 }))
165}
166
167#[debug_handler]
168async fn fetch_checkpoint(
169 State(config): State<Config>,
170 RegistryHeader(_registry_header): RegistryHeader,
171) -> Result<Json<SerdeEnvelope<TimestampedCheckpoint>>, FetchApiError> {
172 Ok(Json(
173 config.core_service.store().get_latest_checkpoint().await?,
174 ))
175}
176
177#[debug_handler]
178async fn fetch_package_names(
179 State(config): State<Config>,
180 RegistryHeader(_registry_header): RegistryHeader,
181 Json(body): Json<FetchPackageNamesRequest<'static>>,
182) -> Result<Json<FetchPackageNamesResponse>, FetchApiError> {
183 let log_ids = if body.packages.len() > MAX_PACKAGE_NAMES_LIMIT {
184 body.packages.get(..MAX_PACKAGE_NAMES_LIMIT).unwrap()
185 } else {
186 &body.packages
187 };
188
189 let packages = config
190 .core_service
191 .store()
192 .get_package_names(log_ids)
193 .await?;
194
195 Ok(Json(FetchPackageNamesResponse { packages }))
196}