warg_server/api/v1/
ledger.rs

1use super::{Json, Path, RegistryHeader};
2use crate::datastore::DataStoreError;
3use crate::services::CoreService;
4use axum::http::StatusCode;
5use axum::{
6    debug_handler, extract::State, response::IntoResponse, response::Response, routing::get, Router,
7};
8use warg_api::v1::ledger::{
9    LedgerError, LedgerSource, LedgerSourceContentType, LedgerSourcesResponse,
10};
11use warg_crypto::hash::HashAlgorithm;
12use warg_protocol::registry::RegistryIndex;
13
14const MAX_LEDGER_RECORDS_LIMIT: usize = 1000;
15
16#[derive(Clone)]
17pub struct Config {
18    core_service: CoreService,
19}
20
21impl Config {
22    pub fn new(core_service: CoreService) -> Self {
23        Self { core_service }
24    }
25
26    pub fn into_router(self) -> Router {
27        Router::new()
28            .route("/", get(get_ledger_sources))
29            .route("/records/:start", get(get_ledger_records))
30            .with_state(self)
31    }
32}
33
34struct LedgerApiError(LedgerError);
35
36impl From<DataStoreError> for LedgerApiError {
37    fn from(e: DataStoreError) -> Self {
38        tracing::error!("unexpected data store error: {e}");
39
40        Self(LedgerError::Message {
41            status: StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
42            message: "an error occurred while processing the request".into(),
43        })
44    }
45}
46
47impl IntoResponse for LedgerApiError {
48    fn into_response(self) -> axum::response::Response {
49        (StatusCode::from_u16(self.0.status()).unwrap(), Json(self.0)).into_response()
50    }
51}
52
53#[debug_handler]
54async fn get_ledger_sources(
55    State(config): State<Config>,
56    RegistryHeader(_registry_header): RegistryHeader,
57) -> Result<Json<LedgerSourcesResponse>, LedgerApiError> {
58    let log_length = config
59        .core_service
60        .store()
61        .get_latest_checkpoint()
62        .await?
63        .into_contents()
64        .checkpoint
65        .log_length;
66
67    let sources = (0..log_length)
68        .step_by(MAX_LEDGER_RECORDS_LIMIT)
69        .map(|start_index| {
70            let end_index = if start_index + MAX_LEDGER_RECORDS_LIMIT >= log_length {
71                log_length - 1
72            } else {
73                start_index + MAX_LEDGER_RECORDS_LIMIT - 1
74            };
75
76            LedgerSource {
77                first_registry_index: start_index,
78                last_registry_index: end_index,
79                url: format!("v1/ledger/records/{start_index}"),
80                accept_ranges: false,
81                content_type: LedgerSourceContentType::Packed,
82            }
83        })
84        .collect::<Vec<LedgerSource>>();
85
86    Ok(Json(LedgerSourcesResponse {
87        hash_algorithm: HashAlgorithm::Sha256,
88        sources,
89    }))
90}
91
92#[debug_handler]
93async fn get_ledger_records(
94    State(config): State<Config>,
95    Path(start): Path<RegistryIndex>,
96    RegistryHeader(_registry_header): RegistryHeader,
97) -> Result<Response, LedgerApiError> {
98    let log_leafs = config
99        .core_service
100        .store()
101        .get_log_leafs_starting_with_registry_index(start, MAX_LEDGER_RECORDS_LIMIT)
102        .await?;
103
104    let mut body: Vec<u8> = Vec::with_capacity(log_leafs.len() * 64);
105
106    for (_, leaf) in log_leafs.into_iter() {
107        body.extend_from_slice(leaf.log_id.as_ref());
108        body.extend_from_slice(leaf.record_id.as_ref());
109    }
110
111    Ok(Response::builder()
112        .status(200)
113        .header(
114            axum::http::header::CONTENT_TYPE,
115            LedgerSourceContentType::Packed.as_str(),
116        )
117        .body(body.into())
118        .unwrap())
119}